Created
May 24, 2019 21:19
-
-
Save hltbra/1f2266b96c50700cc17a10115634e55f to your computer and use it in GitHub Desktop.
Experiments using ae.c/anet.c with Cython + dredis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <errno.h> | |
#include "ae.h" | |
#include "anet.h" | |
#include "cdredis.h" | |
static struct aeEventLoop *eventLoop; | |
static char neterr[100000]; | |
void sendReply(aeEventLoop *el, int fd, void *privdata, int mask) { | |
Client *client = privdata; | |
// printf("[%d]sendReply called, buff=%s\n", sendReply++, buff); | |
if (!client->out_buffer[0]) { | |
return; | |
} | |
int nwritten; | |
// printf("writing '%s'\n", client->out_buffer); | |
nwritten = write(fd, client->out_buffer, strlen(client->out_buffer)); | |
memset(client->out_buffer, 0, sizeof(client->out_buffer)); | |
aeDeleteFileEvent(el, fd, AE_WRITABLE); | |
return; | |
} | |
void on_message(Client *client) { | |
printf("called C on_message\n"); | |
sprintf(client->out_buffer, "+1\r\n"); | |
} | |
void readQuery(aeEventLoop *el, int fd, void *privdata, int mask) { | |
Client *client = privdata; | |
// char *buff = client->buffer; | |
int nread; | |
// char *pong = "+PONG\r\n\r\n"; | |
// printf("readQuery(%d)\n", fd); | |
nread = read(fd, client->buffer, 1024*16); | |
// printf("client->buffer = %s\n", client->buffer); | |
// sprintf(client->out_buffer, "+1\r\n"); | |
// memset(client->buffer, 0, sizeof(client->buffer)); | |
// printf("nread=%d, buff='%s', errno=%d\n", nread, buff, errno); | |
if (nread == -1) { | |
if (errno == EAGAIN) { | |
printf("try again\n"); | |
return; | |
} else { | |
printf("Closing %d (errno=%d)\n", fd, errno); | |
closeClient(client); | |
return; | |
} | |
} else if (nread == 0) { | |
printf("Closing %d (nread=0)\n", fd); | |
closeClient(client); | |
return; | |
} | |
if (fd == -1) { | |
printf("fd == -1\n"); | |
return; | |
} | |
// printf("going to call on_message()\n"); | |
client->on_message(client); | |
// printf("out_buffer = %s\n", client->out_buffer); | |
int should_send = client->out_buffer[0]; // will probably be "read input buffer" | |
// IMPORTANT NOTE: | |
// add the write callback after the buffer has been fullfilled to avoid wasting event loop cycles with an empty buffer. | |
// this optimization also requires deleting the callback after the buffer has been written. | |
if (should_send) { | |
if (aeCreateFileEvent(el, fd, AE_WRITABLE, sendReply, client) == AE_ERR) { | |
// printf("AE_ERR: WRITABLE\n"); | |
// fflush(stdout); | |
closeClient(client); | |
return; | |
} | |
} | |
// printf("read '%s'\n", buff); | |
} | |
Client *createClient(int fd) { | |
Client *client; | |
client = malloc(sizeof(Client)); | |
client->fd = fd; | |
memset(client->buffer, 0, sizeof(client->buffer)); | |
memset(client->out_buffer, 0, sizeof(client->out_buffer)); | |
return client; | |
} | |
void closeClient(Client *client) { | |
aeDeleteFileEvent(eventLoop, client->fd, AE_READABLE); | |
aeDeleteFileEvent(eventLoop, client->fd, AE_WRITABLE); | |
if (client->fd != -1) { | |
close(client->fd); | |
} | |
free(client); | |
} | |
void acceptHandler(int fd, int flags, void (*on_message_callback)(Client *)) { | |
Client *client; | |
if (fd == -1) { | |
return; | |
} | |
anetNonBlock(NULL,fd); | |
anetEnableTcpNoDelay(NULL,fd); | |
if ((client = createClient(fd)) == NULL) { | |
printf("HHHError registering fd event for the new client: %s (fd=%d)\n", | |
strerror(errno),fd); | |
close(fd); /* May be already closed, just ignore errors */ | |
return; | |
} | |
client->on_message = on_message_callback; | |
if (aeCreateFileEvent(eventLoop, fd, AE_READABLE, readQuery, client) == AE_ERR) | |
{ | |
printf("AE_ERR: READABLE\n"); | |
closeClient(client); | |
return; | |
} | |
} | |
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { | |
printf("accept\n"); | |
void (*on_message_callback)(Client *) = privdata; | |
int cport, cfd, max = 1000; | |
char cip[10000]; | |
while(max--) { | |
printf("max=%d\n", max); | |
cfd = anetTcpAccept(neterr, fd, cip, sizeof(cip), &cport); | |
if (cfd == ANET_ERR) { | |
printf("ANET_ERR acceptTCP (%d), errno=%d (%s)\n", cfd, errno, (errno != EWOULDBLOCK) ? "": "wouldblock"); | |
if (errno != EWOULDBLOCK) { | |
printf("Accepting client connection: %s\n", neterr); | |
} | |
return; | |
} | |
printf("Accepted %s:%d\n", cip, cport); | |
acceptHandler(cfd, 0, on_message_callback); | |
} | |
} | |
int startServer(void (*on_message_callback)(Client *)) { | |
eventLoop = aeCreateEventLoop(100000); | |
int fd = create_tcp_server(on_message_callback); | |
if (fd == -1) { | |
return 1; | |
} | |
aeMain(eventLoop); | |
aeDeleteEventLoop(eventLoop); | |
return 0; | |
} | |
int create_tcp_server(void (*on_message_callback)(Client *)) { | |
int fd; | |
int backlog = 5000; | |
char *address = "0.0.0.0"; | |
int port = 6377; | |
fd = anetTcpServer(neterr, port, address, backlog); | |
if (fd == ANET_ERR) { | |
printf("anet_err tcp server\n"); | |
return -1; | |
} else { | |
printf("got server\n"); | |
} | |
if (anetNonBlock(NULL,fd) == ANET_ERR) { | |
printf("ANET_ERR nonblock"); | |
return -1; | |
} | |
if (aeCreateFileEvent(eventLoop, fd, AE_READABLE, acceptTcpHandler, on_message_callback) == AE_ERR) { | |
printf("Unrecoverable error creating server.ipfd file event.\n"); | |
return -1; | |
} else { | |
printf("got file event\n"); | |
} | |
return fd; | |
} | |
int main() { | |
return startServer(on_message); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
typedef struct _client { | |
char buffer[1024 * 1024]; | |
char out_buffer[1024 * 1024]; | |
void (*on_message)(struct _client *); | |
int fd; | |
} Client; | |
void closeClient(Client *); | |
Client *createClient(int); | |
int create_tcp_server(void (*on_message_callback)(Client *)); | |
extern int startServer(void (*on_message_callback)(Client *)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cdef extern from "cdredis.h": | |
ctypedef struct Client: | |
char *buffer; | |
char *out_buffer; | |
void (*on_message)(Client *); | |
int fd; | |
int startServer(void (*on_message)(Client *)) | |
Client *createClient(int) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from libc.string cimport strcpy, memset | |
from dredis import db | |
from dredis.keyspace import Keyspace | |
from dredis.server import execute_cmd | |
from dredis.parser import Parser | |
cimport cythondredis | |
class Parser2(Parser): | |
def __init__(self, data): | |
self._buffer = data | |
def _read_into_buffer(self): | |
pass | |
keyspace = Keyspace() | |
cdef void on_message(cythondredis.Client *c): | |
def send(data): | |
strcpy(c.out_buffer, data) | |
parser = Parser2(c.buffer) | |
for cmd in parser.get_instructions(): | |
execute_cmd(keyspace, send, *cmd) | |
memset(c.buffer, 0, sizeof(c.buffer)) | |
db.DB_MANAGER.setup_dbs('/tmp/cython-db', 'memory', {}) | |
cythondredis.startServer(on_message) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cd ~/projects/redis/src | |
export CFLAGS="-I$PWD" | |
export LDFLAGS="-L$PWD" | |
gcc -O2 zmalloc.c ae.c anet.c cdredis.c -shared -o libcdredis.so && python setup.py build_ext -i && python -c 'import cythondredis' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from distutils.core import setup | |
from distutils.extension import Extension | |
from Cython.Build import cythonize | |
setup( | |
ext_modules=cythonize([ | |
Extension("cythondredis", ["cythondredis.pyx"], | |
libraries=["cdredis"])]) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment