Cheap Messaging

Sometimes you're prototyping something quick, and you want to have an Internet-accessible endpoint that will let you forward messages along, either in a one-to-one or a one-to-many configuration.

I've found myself doing this quite a bit over the years to prototype something or another, and hit upon a decent pattern way back in the day that I've replicated a number of times with some variations.

Again, this is not meant for production use, but instead you might be able to get away with some of these constraints for a hacky prototype.

Platform

Some common solutions upon which you can build your prototype include these.

Hosting

If you have a server of your own, you can probably rehost an existing solution.

From the public cloud offerings, here are some solutions and costs.

Libraries and Runtimes

Depending on your platform, you'll probably want to access this with some more or less specialized library.

Example

I created an MY_RESOURCE resource with non-TLS port enabled, redis version 6, Entra Auth disabled, Access Keys enabled.

Host Name: MY_RESOURCE.redis.cache.windows.net
Primary Key: MY_KEY

Today, I'm choosing C for the language and hiredis as my library. We're going to be building up an rmsg.c file to walk through the basics of how to do both message queue style and broadcast style messaging.

The build steps are straightforward enough that you can simply run the last command repeatedly.

$ pushd ~/scratch
$ git clone https://github.com/redis/hiredis
$ export REDIS_HOST=xxx
$ export REDIS_PWD=xxx
$ pushd hiredis && make && popd
$ clang rmsg.c -o rmsg hiredis/libhiredis.a && ./rmsg

Let's open up a rmsg.c file and type away.

We're going to work through examples, so our main function will simply grab host and password and we'll go from there.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h> // for receiver threads
#include <unistd.h> // for sleep

#include "hiredis/hiredis.h"

const char *g_rhost;
const char *g_rpwd;
const int g_rport = 6379;

// ...

int main() {
  g_rhost = getenv("REDIS_HOST");
  g_rpwd = getenv("REDIS_PWD");

  printf("Testing message queue pattern\n");
  doMessageQueue();

  printf("Testing broadcast pattern\n");
  doBroadcast();

  return 0;
}

Each "do..." function will kick off a reader as needed and play the part of the writer. There are other ways of organizing but this keeps things isolated.

Example - Connecting

One thing we'll be doing a lot is creating connections, so let's start with that.

//! Create a connection to a redis host.
static redisContext *connectToHost(const char *rhost, const char *rpwd) {
  redisContext *rctx; // redis context object
  redisReply *reply;  // redis reply object

  printf("Connecting to redis server %s...\n", rhost);
  rctx = redisConnect(rhost, g_rport);
  if (!rctx || rctx->err) {
    if (rctx) {
      printf("Failed to connect: %s\n", rctx->errstr);
    } else {
      printf("Failed to create redis context\n");
    }
    return 0;
  }

  printf("Authenticating with redis server...\n");
  reply = redisCommand(rctx, "AUTH %s", rpwd);
  if (!reply || rctx->err) {
    printf("Failed redis authorization\n");
    freeReplyObject(reply); // ok to call if null
    redisFree(rctx);
    return 0;
  }
  freeReplyObject(reply); 
  printf("Connected to redis server\n");
  return rctx;
}

Example - message queue

To use a message queue, we'll have a key that holds a list of values, each representing a message. The key acts as the "inbox" in a sense.

We can send a message with RPUSH and receive it with LPOP, which creates a first-in-first-out queue.

static void *doMessageQueueListen(void *ctx) {
  redisContext *listenContext;
  redisReply *reply;

  // Read a message.
  listenContext = connectToHost(g_rhost, g_rpwd);
  if (!listenContext) {
    return NULL;
  }
  reply = redisCommand(listenContext, "LPOP %s", g_peerQueueName);
  if (reply) {
    printf("Message queue listener read: %s\n", reply->str);
    freeReplyObject(reply);
  }
  redisFree(listenContext);
  
  return NULL;
}

//! Run through a message queue example.
static int doMessageQueue() {
  pthread_t receiverThread;
  pthread_attr_t receiverThreadAttr;
  redisContext *senderContext;
  redisReply *senderReply;

  // Start a thread that will listen to a message queue.
  if (pthread_create(&receiverThread, NULL, doMessageQueueListen, NULL)) {
    printf("Failed to create thread\n");
    return 1;
  }
  
  // Send a message.
  senderContext = connectToHost(g_rhost, g_rpwd);
  if (!senderContext) {
    printf("Failed to create sender context\n");
    return 1;
  }
  senderReply = redisCommand(senderContext, "RPUSH %s %s", g_peerQueueName, "howdy");
  freeReplyObject(senderReply);
  redisFree(senderContext);
  
  // Exit when the message is received.
  printf("Waiting for listener to exit ...\n");
  return pthread_join(receiverThread, NULL);
}

Note that the LPOP command is blocking. If you're using synchronous calls, you can interrupt the command by closing the socket from the receiver context - you just need to be a bit more careful to make sure that the context has been successfully created before doing that.

Example - broadcast

Here is the code for the broadcast case. This time I decided to show how we can keep looping to receive messages.

In this case, send a message with PUBLISH and receive it with SUBSCRIBE, which creates a one-to-many publishing mechanism where we do not queue - if you're not connected at the time the message is published, you won't receive it. This presumably works better for more realtime cases where older information might not be as useful as having the latest/greatest.

//! Run through a broadcast listener.
static void *doBroadcastListen(void *ctx) {
  redisContext *listenContext;
  redisReply *reply;

  // Listen for a message.
  listenContext = connectToHost(g_rhost, g_rpwd);
  if (!listenContext) {
    return NULL;
  }
  reply = redisCommand(listenContext, "SUBSCRIBE %s", g_broadcastTopic);
  if (reply) {
    freeReplyObject(reply);
    reply = NULL;
  }

  // read messages forever (although we'll stop after first)
  for (;;) {
    if (redisGetReply(listenContext, (void **)&reply) != REDIS_OK) {
      printf("Failed to read reply\n");
      break;
    }
    if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 3) {
      printf("Broadcast listener message: %.*s\n",
        (int)reply->element[2]->len, reply->element[2]->str);
    } else {
      printf("did not understand reply\n");
    }
    freeReplyObject(reply);
    reply = NULL;
    break;
  }

  freeReplyObject(reply);
  redisFree(listenContext);
 
  return NULL;
}

//! Run through a message broadcast example.
static int doBroadcast() {
  pthread_t receiverThread;
  redisContext *senderContext;
  redisReply *senderReply;

  // Start a thread that will listen to broadcasts.
  if (pthread_create(&receiverThread, NULL, doBroadcastListen, NULL)) {
    printf("Failed to create thread\n");
    return 1;
  }

  // Wait a bit before sending, because these *do not* queue.
  sleep(1);
  
  // Send a message.
  senderContext = connectToHost(g_rhost, g_rpwd);
  if (!senderContext) {
    printf("Failed to create sender context\n");
    return 1;
  }
  senderReply = redisCommand(senderContext,
    "PUBLISH %s %s", g_broadcastTopic, "broadcast!");
  freeReplyObject(senderReply);
  redisFree(senderContext);
  
  // Exit when the message is received.
  printf("Waiting for broadcast listener to exit ...\n");
  return pthread_join(receiverThread, NULL);
}

Well, that was quite a lot of code, but hopefully it's straightforward enough that it's easy to pick apart and understand.

Happy messaging exchange!

Tags:  codingdesignnet

Home