Skip to content

Commit

Permalink
subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
Fan Zhou committed Oct 17, 2016
1 parent 871a71b commit 0fd01f3
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 108 deletions.
188 changes: 99 additions & 89 deletions deps/hiredis/main.cpp
Original file line number Diff line number Diff line change
@@ -1,90 +1,100 @@
#include "hiredis.h"
#include <stdlib.h>
#include <memory.h>

int main(int argc, char **argv) {
return 0;
}

#ifdef __cplusplus
extern "C" {
#endif
#include "adapters/ae.h"

void mqLog(const char*format, ...) {
FILE* f = fopen("D:\\mqDebug.log", "a+");
va_list args;
va_start(args, format);
vfprintf(f, format, args);
va_end(args);
fflush(f);
fclose(f);
}

redisContext* mqConnect(const wchar_t *ip, int port)
{
char buf[128];
size_t sz = wcstombs(buf, ip, sizeof(buf));
if (sz > 0 && sz <= sizeof(buf)) {
return redisConnect(buf, port);
}
return NULL;
}

///* This is the reply object returned by redisCommand() */
//typedef struct redisReply {
// int type; /* REDIS_REPLY_* */
// PORT_LONGLONG integer; /* The integer when type is REDIS_REPLY_INTEGER */
// int len; /* Length of string */
// char *str; /* Used for both REDIS_REPLY_ERROR and REDIS_REPLY_STRING */
// size_t elements; /* number of elements, for REDIS_REPLY_ARRAY */
// struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */
//} redisReply;

int mqReplyType(const redisReply* reply) {
return reply->type;
}

bool mqReplyInt(const redisReply* reply, PORT_LONGLONG* result) {
if (reply->type == REDIS_REPLY_INTEGER) {
*result = reply->integer;
return true;
}
return false;
}

int mqReplyStrLen(const redisReply* reply) {
switch (reply->type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
return reply->len;
default:
return -1;
}
}

bool mqReplyStr(const redisReply* reply, char* result) {
switch (reply->type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
memcpy(result, reply->str, reply->len);
return true;
default:
return false;
}
}

#include "hiredis.h"
#include <stdlib.h>
#include <memory.h>

int main(int argc, char **argv) {
return 0;
}

#ifdef __cplusplus
extern "C" {
#endif
#include "adapters/ae.h"

void mqLog(const char*format, ...) {
FILE* f = fopen("D:\\mqDebug.log", "a+");
va_list args;
va_start(args, format);
vfprintf(f, format, args);
va_end(args);
fflush(f);
fclose(f);
}

redisContext* mqConnect(const wchar_t *ip, int port)
{
char buf[128];
size_t sz = wcstombs(buf, ip, sizeof(buf));
if (sz > 0 && sz <= sizeof(buf)) {
return redisConnect(buf, port);
}
return NULL;
}

///* This is the reply object returned by redisCommand() */
//typedef struct redisReply {
// int type; /* REDIS_REPLY_* */
// PORT_LONGLONG integer; /* The integer when type is REDIS_REPLY_INTEGER */
// int len; /* Length of string */
// char *str; /* Used for both REDIS_REPLY_ERROR and REDIS_REPLY_STRING */
// size_t elements; /* number of elements, for REDIS_REPLY_ARRAY */
// struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */
//} redisReply;

int mqReplyType(const redisReply* reply) {
return reply->type;
}

bool mqReplyInt(const redisReply* reply, PORT_LONGLONG* result) {
if (reply->type == REDIS_REPLY_INTEGER) {
*result = reply->integer;
return true;
}
return false;
}

int mqReplyStrLen(const redisReply* reply) {
switch (reply->type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
return reply->len;
default:
return -1;
}
}

bool mqReplyStr(const redisReply* reply, char* result) {
switch (reply->type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
memcpy(result, reply->str, reply->len);
return true;
default:
return false;
}
}

void onReply(redisAsyncContext *c, void *reply, void *privdata) {
mqLog("%s:%p\n", __FUNCTION__, reply);
*((void**)privdata) = &reply;
}

int mqSubscribe(redisAsyncContext *c, const char* channel, void* privdata) {
return redisAsyncCommand(c, onReply, privdata, "SUBSCRIBE %s", channel);
}

#ifdef __cplusplus
}
#endif
mqLog("%s:%p,%p\n", __FUNCTION__, reply, privdata);
*((void**)privdata) = reply;
}

int mqSubscribe(redisAsyncContext *c, const char* channel, void* privdata) {
mqLog("%s:%s,%p\n", __FUNCTION__, channel, privdata);
return redisAsyncCommand(c, onReply, privdata, "SUBSCRIBE %s", channel);
}

void mqProcessEvents(aeEventLoop *eventLoop){
mqLog("%s:%p,%d\n", __FUNCTION__, eventLoop, eventLoop->stop);
if (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

#ifdef __cplusplus
}
#endif
40 changes: 21 additions & 19 deletions msvs/hiredis/hiredis.def
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
EXPORTS
redisFree
redisCommand
freeReplyObject

redisAeAttach
redisAsyncConnect
redisAsyncCommand

aeCreateEventLoop
aeMain

mqConnect
mqLog
mqReplyType
mqReplyInt
mqReplyStrLen
mqReplyStr
mqSubscribe
EXPORTS
redisFree
redisCommand
freeReplyObject

redisAeAttach
redisAsyncConnect
redisAsyncCommand

aeCreateEventLoop
aeMain
aeProcessEvents

mqConnect
mqLog
mqReplyType
mqReplyInt
mqReplyStrLen
mqReplyStr
mqSubscribe
mqProcessEvents

0 comments on commit 0fd01f3

Please sign in to comment.