diff --git a/library.json b/library.json index 14514c0e..136d2ca2 100644 --- a/library.json +++ b/library.json @@ -6,7 +6,7 @@ "type": "git", "url": "https://github.com/thingsboard/pubsubclient.git" }, - "version": "2.9.4", + "version": "2.10.0", "exclude": "tests", "examples": "examples/*/*.ino", "frameworks": "arduino", diff --git a/library.properties b/library.properties index 064c1e7f..b9dcc222 100644 --- a/library.properties +++ b/library.properties @@ -1,5 +1,5 @@ name=TBPubSubClient -version=2.9.4 +version=2.10.0 author=ThingsBoard maintainer=ThingsBoard Team sentence=A client library for MQTT messaging. diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp index 5a49de48..5b853844 100755 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -159,7 +159,8 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN } PubSubClient::~PubSubClient() { - free(this->buffer); + free(this->receive_buffer); + free(this->send_buffer); } boolean PubSubClient::connect(const char *id) { @@ -195,9 +196,9 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass if (result == 1) { nextMsgId = 1; - // Leave room in the buffer for header and variable length field + // Leave room in the receive_buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; - unsigned int j; + size_t j; #if MQTT_VERSION == MQTT_VERSION_3_1 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; @@ -207,7 +208,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass #define MQTT_HEADER_VERSION_LENGTH 7 #endif for (j = 0;jbuffer[length++] = d[j]; + this->receive_buffer[length++] = d[j]; } uint8_t v; @@ -227,30 +228,30 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass v = v|(0x80>>1); } } - this->buffer[length++] = v; + this->receive_buffer[length++] = v; - this->buffer[length++] = ((this->keepAlive) >> 8); - this->buffer[length++] = ((this->keepAlive) & 0xFF); + this->receive_buffer[length++] = ((this->keepAlive) >> 8); + this->receive_buffer[length++] = ((this->keepAlive) & 0xFF); CHECK_STRING_LENGTH(length,id) - length = writeString(id,this->buffer,length); + length = writeString(id,this->receive_buffer,length); if (willTopic) { CHECK_STRING_LENGTH(length,willTopic) - length = writeString(willTopic,this->buffer,length); + length = writeString(willTopic,this->receive_buffer,length); CHECK_STRING_LENGTH(length,willMessage) - length = writeString(willMessage,this->buffer,length); + length = writeString(willMessage,this->receive_buffer,length); } if(user != NULL) { CHECK_STRING_LENGTH(length,user) - length = writeString(user,this->buffer,length); + length = writeString(user,this->receive_buffer,length); if(pass != NULL) { CHECK_STRING_LENGTH(length,pass) - length = writeString(pass,this->buffer,length); + length = writeString(pass,this->send_buffer,length); } } - write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE); + write(MQTTCONNECT,this->receive_buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); @@ -266,13 +267,13 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass uint32_t len = readPacket(&llen); if (len == 4) { - if (buffer[3] == 0) { + if (receive_buffer[3] == 0) { lastInActivity = millis(); pingOutstanding = false; _state = MQTT_CONNECTED; return true; } else { - _state = buffer[3]; + _state = receive_buffer[3]; } } _client->stop(); @@ -311,8 +312,8 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { uint16_t len = 0; - if(!readByte(this->buffer, &len)) return 0; - bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH; + if(!readByte(this->receive_buffer, &len)) return 0; + bool isPublish = (this->receive_buffer[0]&0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; uint32_t length = 0; uint8_t digit = 0; @@ -327,7 +328,7 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { return 0; } if(!readByte(&digit)) return 0; - this->buffer[len++] = digit; + this->receive_buffer[len++] = digit; length += (digit & 127) * multiplier; multiplier <<=7; //multiplier *= 128 } while ((digit & 128) != 0); @@ -335,11 +336,11 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { if (isPublish) { // Read in topic length to calculate bytes to skip over for Stream writing - if(!readByte(this->buffer, &len)) return 0; - if(!readByte(this->buffer, &len)) return 0; - skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2]; + if(!readByte(this->receive_buffer, &len)) return 0; + if(!readByte(this->receive_buffer, &len)) return 0; + skip = (this->receive_buffer[*lengthLength+1]<<8)+this->receive_buffer[*lengthLength+2]; start = 2; - if (this->buffer[0]&MQTTQOS1) { + if (this->receive_buffer[0]&MQTTQOS1) { // skip message id skip += 2; } @@ -355,7 +356,7 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { } if (len < this->bufferSize) { - this->buffer[len] = digit; + this->receive_buffer[len] = digit; len++; } idx++; @@ -381,41 +382,41 @@ boolean PubSubClient::loop_read() { } unsigned long t = millis(); lastInActivity = t; - uint8_t type = buffer[0]&0xF0; + uint8_t type = receive_buffer[0]&0xF0; switch(type) { case MQTTPUBLISH: { if (callback) { - const boolean msgId_present = (buffer[0]&0x06) == MQTTQOS1; + const boolean msgId_present = (receive_buffer[0]&0x06) == MQTTQOS1; const uint16_t tl_offset = llen+1; - const uint16_t tl = (buffer[tl_offset]<<8)+buffer[tl_offset+1]; /* topic length in bytes */ + const uint16_t tl = (receive_buffer[tl_offset]<<8)+receive_buffer[tl_offset+1]; /* topic length in bytes */ const uint16_t topic_offset = tl_offset+2; const uint16_t msgId_offset = topic_offset+tl; const uint16_t payload_offset = msgId_present ? msgId_offset+2 : msgId_offset; if ((payload_offset) >= this->bufferSize) {return false;} if (len < payload_offset) {return false;} - memmove(buffer+topic_offset-1,buffer+topic_offset,tl); /* move topic inside buffer 1 byte to front */ - buffer[topic_offset-1+tl] = 0; /* end the topic as a 'C' string with \x00 */ - char *topic = (char*) buffer+topic_offset-1; + memmove(receive_buffer+topic_offset-1,receive_buffer+topic_offset,tl); /* move topic inside receive_buffer 1 byte to front */ + receive_buffer[topic_offset-1+tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) receive_buffer+topic_offset-1; uint8_t *payload; // msgId only present for QOS>0 if (msgId_present) { - const uint16_t msgId = (buffer[msgId_offset]<<8)+buffer[msgId_offset+1]; - payload = buffer+payload_offset; + const uint16_t msgId = (receive_buffer[msgId_offset]<<8)+receive_buffer[msgId_offset+1]; + payload = receive_buffer+payload_offset; callback(topic,payload,len-payload_offset); if (_client->connected()) { - buffer[0] = MQTTPUBACK; - buffer[1] = 2; - buffer[2] = (msgId >> 8); - buffer[3] = (msgId & 0xFF); - if (_client->write(buffer,4) != 0) { + receive_buffer[0] = MQTTPUBACK; + receive_buffer[1] = 2; + receive_buffer[2] = (msgId >> 8); + receive_buffer[3] = (msgId & 0xFF); + if (_client->write(receive_buffer,4) != 0) { lastOutActivity = t; } } } else { // No msgId - payload = buffer+payload_offset; + payload = receive_buffer+payload_offset; callback(topic,payload,len-payload_offset); } } @@ -424,9 +425,9 @@ boolean PubSubClient::loop_read() { case MQTTPINGREQ: { if (_client->connected()) { - buffer[0] = MQTTPINGRESP; - buffer[1] = 0; - _client->write(buffer,2); + receive_buffer[0] = MQTTPINGRESP; + receive_buffer[1] = 0; + _client->write(receive_buffer,2); } break; } @@ -451,9 +452,9 @@ boolean PubSubClient::loop() { _client->stop(); return false; } else { - buffer[0] = MQTTPINGREQ; - buffer[1] = 0; - if (_client->write(buffer,2) != 0) { + receive_buffer[0] = MQTTPINGREQ; + receive_buffer[1] = 0; + if (_client->write(receive_buffer,2) != 0) { lastOutActivity = t; lastInActivity = t; } @@ -483,14 +484,14 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t // Too long return false; } - // Leave room in the buffer for header and variable length field + // Leave room in the send_buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; - length = writeString(topic,this->buffer,length); + length = writeString(topic,this->send_buffer,length); // Add payload uint16_t i; for (i=0;ibuffer[length++] = payload[i]; + this->send_buffer[length++] = payload[i]; } // Write the header @@ -498,7 +499,7 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t if (retained) { header |= 1; } - return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE); + return write(header,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } @@ -510,10 +511,10 @@ boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, boolean retained) { uint8_t llen = 0; uint8_t digit; - unsigned int rc = 0; + size_t rc = 0; uint16_t tlen; - unsigned int pos = 0; - unsigned int i; + size_t pos = 0; + size_t i; uint8_t header; size_t len; size_t expectedLength; @@ -528,7 +529,7 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_ if (retained) { header |= 1; } - this->buffer[pos++] = header; + this->send_buffer[pos++] = header; len = plength + 2 + tlen; do { digit = len & 127; //digit = len %128 @@ -536,13 +537,13 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_ if (len > 0) { digit |= 0x80; } - this->buffer[pos++] = digit; + this->send_buffer[pos++] = digit; llen++; } while(len>0); - pos = writeString(topic,this->buffer,pos); + pos = writeString(topic,this->send_buffer,pos); - rc += _client->write(this->buffer,pos); + rc += _client->write(this->send_buffer,pos); for (i=0;iwrite((char)pgm_read_byte_near(payload + i)); @@ -559,13 +560,13 @@ boolean PubSubClient::beginPublish(const char* topic, size_t plength, boolean re if (connected()) { // Send the header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; - length = writeString(topic,this->buffer,length); + length = writeString(topic,this->send_buffer,length); uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } - size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE); - uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); + size_t hlen = buildHeader(header, this->send_buffer, plength+length-MQTT_MAX_HEADER_SIZE); + uint16_t rc = _client->write(this->send_buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); lastOutActivity = millis(); return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); } @@ -651,17 +652,17 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { return false; } if (connected()) { - // Leave room in the buffer for header and variable length field + // Leave room in the send_buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } - this->buffer[length++] = (nextMsgId >> 8); - this->buffer[length++] = (nextMsgId & 0xFF); - length = writeString((char*)topic, this->buffer,length); - this->buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + this->send_buffer[length++] = (nextMsgId >> 8); + this->send_buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*)topic, this->send_buffer,length); + this->send_buffer[length++] = qos; + return write(MQTTSUBSCRIBE|MQTTQOS1,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } @@ -681,18 +682,18 @@ boolean PubSubClient::unsubscribe(const char* topic) { if (nextMsgId == 0) { nextMsgId = 1; } - this->buffer[length++] = (nextMsgId >> 8); - this->buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, this->buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + this->send_buffer[length++] = (nextMsgId >> 8); + this->send_buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, this->send_buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } void PubSubClient::disconnect() { - this->buffer[0] = MQTTDISCONNECT; - this->buffer[1] = 0; - _client->write(this->buffer,2); + this->send_buffer[0] = MQTTDISCONNECT; + this->send_buffer[1] = 0; + _client->write(this->send_buffer,2); _state = MQTT_DISCONNECTED; _client->flush(); _client->stop(); @@ -775,17 +776,24 @@ boolean PubSubClient::setBufferSize(uint16_t size) { return false; } if (this->bufferSize == 0) { - this->buffer = (uint8_t*)malloc(size); + this->receive_buffer = (uint8_t*)malloc(size); + this->send_buffer = (uint8_t*)malloc(size); } else { - uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size); + uint8_t* newBuffer = (uint8_t*)realloc(this->receive_buffer, size); if (newBuffer != NULL) { - this->buffer = newBuffer; + this->receive_buffer = newBuffer; + } else { + return false; + } + newBuffer = (uint8_t*)realloc(this->send_buffer, size); + if (newBuffer != NULL) { + this->send_buffer = newBuffer; } else { return false; } } this->bufferSize = size; - return (this->buffer != NULL); + return (this->receive_buffer != NULL) && (this->send_buffer != NULL); } uint16_t PubSubClient::getBufferSize() { diff --git a/src/PubSubClient.h b/src/PubSubClient.h index c858f5d0..5acfc5b4 100755 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -92,7 +92,8 @@ class PubSubClient : public Print { private: Client* _client; - uint8_t* buffer; + uint8_t* receive_buffer; + uint8_t* send_buffer; uint16_t bufferSize; uint16_t keepAlive; uint16_t socketTimeout;