In this blogpost, we are going to show you how easy it is to implement a Publish/Subscribe pattern with Mongoose using plain TCP protocol. As always, the full source code of this example is available on Github. All you need to do is to clone the repo and type “make” in the example directory.

How the example works

When the example is build, it creates a binary executable file which can be started in either client or server mode:

$ ./publish_subscribe
Usage: ./publish_subscribe <client|server>

We start one server and several clients:

$ ./publish_subscribe 1234 server
Starting pubsub server on port 1234
$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.
$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.

Now, when we type in the terminal for one of the clients and press enter, the typed message is sent to the server. The server then broadcasts the message to the rest of the connected clients. Here, typing message and pressing enter is message PUBLISHING. Each client, by connecting to the server, SUBSCRIBES to the messages from the other clients.

Let’s see how it works in a terminal. A client that sends a message looks something like this:

$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.
hello
hello

The first “hello” is a typed message. The second one is a broadcast from the server. The other connected client will show only one broadcasted message:

$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.
hello

Now, let’s see how this is implemented.

Server code

The server code is simple because it doesn’t need to deal with STDIO. In main(), it opens a listening connection:

// Server code path
mg_mgr_init(&mgr, NULL);
mg_bind(&mgr, argv[1], server_handler);
printf("Starting pubsub server on port %s\n", argv[1]);

In the event handler, all it needs to do is to handle the MG_EV_RECV message: when message is received, we need to iterate over all active client connections and push that message to every connection. At the end, we clear the received message from the IO buffer. Here is the code:

static void server_handler(struct mg_connection *nc, int ev, void *p) {
(void) p;
if (ev == MG_EV_RECV) {
// Push received message to all connections
struct mbuf *io = &nc->recv_mbuf;
struct mg_connection *c;
for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c)) {
mg_send(c, io->buf, io->len);
}
mbuf_remove(io, io->len);
}
}

Client code

Client code is a bit more elaborate. It needs to read from the standard input in case the user types a message. Then, each received message it needs to print on standard output. So, in main(), the client code first creates socket pairs. One end of socket pair is given to a dedicated thread that reads from stdin. The other end is monitored by the event manager. Also, we create a connection to the server:

int fds[2];
struct mg_connection *ioconn, *server_conn;
mg_mgr_init(&mgr, NULL);
// Connect to the pubsub server
server_conn = mg_connect(&mgr, argv[1], client_handler);
if (server_conn == NULL) {
fprintf(stderr, "Cannot connect to port %s\n", argv[1]);
exit(EXIT_FAILURE);
}
// Create a socketpair and give one end to the thread that reads stdin
mg_socketpair(fds, SOCK_STREAM);
mg_start_thread(stdin_thread, &fds[1]);
// The other end of a pair goes inside the server
ioconn = mg_add_sock(&mgr, fds[0], client_handler);
ioconn->flags |= MG_F_USER_1; // Mark this so we know this is a
stdin
ioconn->user_data = server_conn;

The event handler is simple. If we receive data from the stdin connection, we forward it to the server. If we receive data from the server, we print it to stdout:

static void client_handler(struct mg_connection *conn, int ev, void *p) {
struct mbuf *io = &conn->recv_mbuf;
(void) p;
if (ev == MG_EV_CONNECT) {
if (conn->flags & MG_F_CLOSE_IMMEDIATELY) {
printf("%s\n", "Error connecting to server!");
exit(EXIT_FAILURE);
}
printf("%s\n", "Connected to server. Type a message and press enter.");
} else if (ev == MG_EV_RECV) {
if (conn->flags & MG_F_USER_1) {
// Received data from the stdin, forward it to the server
struct mg_connection *c = (struct mg_connection *) conn->user_data;
mg_send(c, io->buf, io->len);
mbuf_remove(io, io->len);
} else {
// Received data from server connection, print it
fwrite(io->buf, io->len, 1, stdout);
mbuf_remove(io, io->len);
}
} else if (ev == MG_EV_CLOSE) {
// Connection has closed, most probably cause server has stopped
exit(EXIT_SUCCESS);
}
}

Enjoy recreating this example!
To contact: send us a message or ask on the developer forum.