pidgin/pidgin

1896a80ff8e3
Route GLib debug logging directly to the Finch debug window

Instead of flowing through purple debug, this merges some bits of the existing GLib log handler, and the purple debug printer.

Testing Done:
Open the Debug window an see some `GLib-*` outputs.

Reviewed at https://reviews.imfreedom.org/r/1057/
/* purple
*
* Purple is the legal property of its developers, whose names are too numerous
* to list here. Please refer to the COPYRIGHT file distributed with this
* source distribution.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
*/
#include <glib/gi18n-lib.h>
#include <glib/gprintf.h>
#include <stdarg.h>
#include <string.h>
#include <purple.h>
#include "mqtt.h"
#include "util.h"
typedef struct
{
PurpleConnection *gc;
GIOStream *conn;
GBufferedInputStream *input;
PurpleQueuedOutputStream *output;
GCancellable *cancellable;
gboolean connected;
guint16 mid;
GByteArray *rbuf;
gsize remz;
gint tev;
} FbMqttPrivate;
/**
* FbMqtt:
*
* Represents an MQTT connection.
*/
struct _FbMqtt
{
GObject parent;
FbMqttPrivate *priv;
};
G_DEFINE_TYPE_WITH_PRIVATE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
typedef struct
{
FbMqttMessageType type;
FbMqttMessageFlags flags;
GByteArray *bytes;
guint offset;
guint pos;
gboolean local;
} FbMqttMessagePrivate;
/**
* FbMqttMessage:
*
* Represents a reader/writer for an MQTT message.
*/
struct _FbMqttMessage
{
GObject parent;
FbMqttMessagePrivate *priv;
};
G_DEFINE_TYPE_WITH_PRIVATE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
static void fb_mqtt_read_packet(FbMqtt *mqtt);
static void
fb_mqtt_dispose(GObject *obj)
{
FbMqtt *mqtt = FB_MQTT(obj);
FbMqttPrivate *priv = mqtt->priv;
fb_mqtt_close(mqtt);
g_byte_array_free(priv->rbuf, TRUE);
}
static void
fb_mqtt_class_init(FbMqttClass *klass)
{
GObjectClass *gklass = G_OBJECT_CLASS(klass);
gklass->dispose = fb_mqtt_dispose;
/**
* FbMqtt::connect:
* @mqtt: The #FbMqtt.
*
* Emitted upon the successful completion of the connection
* process. This is emitted as a result of #fb_mqtt_connect().
*/
g_signal_new("connect",
G_TYPE_FROM_CLASS(klass),
G_SIGNAL_ACTION,
0,
NULL, NULL, NULL,
G_TYPE_NONE,
0);
/**
* FbMqtt::error:
* @mqtt: The #FbMqtt.
* @error: The #GError.
*
* Emitted whenever an error is hit within the #FbMqtt. This
* should close the #FbMqtt with #fb_mqtt_close().
*/
g_signal_new("error",
G_TYPE_FROM_CLASS(klass),
G_SIGNAL_ACTION,
0,
NULL, NULL, NULL,
G_TYPE_NONE,
1, G_TYPE_ERROR);
/**
* FbMqtt::open:
* @mqtt: The #FbMqtt.
*
* Emitted upon the successful opening of the remote socket.
* This is emitted as a result of #fb_mqtt_open(). This should
* call #fb_mqtt_connect().
*/
g_signal_new("open",
G_TYPE_FROM_CLASS(klass),
G_SIGNAL_ACTION,
0,
NULL, NULL, NULL,
G_TYPE_NONE,
0);
/**
* FbMqtt::publish:
* @mqtt: The #FbMqtt.
* @topic: The topic.
* @pload: The payload.
*
* Emitted upon an incoming message from the steam.
*/
g_signal_new("publish",
G_TYPE_FROM_CLASS(klass),
G_SIGNAL_ACTION,
0,
NULL, NULL, NULL,
G_TYPE_NONE,
2, G_TYPE_STRING, G_TYPE_BYTE_ARRAY);
}
static void
fb_mqtt_init(FbMqtt *mqtt)
{
FbMqttPrivate *priv = fb_mqtt_get_instance_private(mqtt);
mqtt->priv = priv;
priv->rbuf = g_byte_array_new();
}
static void
fb_mqtt_message_dispose(GObject *obj)
{
FbMqttMessagePrivate *priv = FB_MQTT_MESSAGE(obj)->priv;
if ((priv->bytes != NULL) && priv->local) {
g_byte_array_free(priv->bytes, TRUE);
}
}
static void
fb_mqtt_message_class_init(FbMqttMessageClass *klass)
{
GObjectClass *gklass = G_OBJECT_CLASS(klass);
gklass->dispose = fb_mqtt_message_dispose;
}
static void
fb_mqtt_message_init(FbMqttMessage *msg)
{
FbMqttMessagePrivate *priv = fb_mqtt_message_get_instance_private(msg);
msg->priv = priv;
}
GQuark
fb_mqtt_error_quark(void)
{
static GQuark q = 0;
if (G_UNLIKELY(q == 0)) {
q = g_quark_from_static_string("fb-mqtt-error-quark");
}
return q;
}
FbMqtt *
fb_mqtt_new(PurpleConnection *gc)
{
FbMqtt *mqtt;
FbMqttPrivate *priv;
g_return_val_if_fail(PURPLE_IS_CONNECTION(gc), NULL);
mqtt = g_object_new(FB_TYPE_MQTT, NULL);
priv = mqtt->priv;
priv->gc = gc;
return mqtt;
};
void
fb_mqtt_close(FbMqtt *mqtt)
{
FbMqttPrivate *priv;
g_return_if_fail(FB_IS_MQTT(mqtt));
priv = mqtt->priv;
if (priv->tev > 0) {
g_source_remove(priv->tev);
priv->tev = 0;
}
if (priv->cancellable != NULL) {
g_cancellable_cancel(priv->cancellable);
g_clear_object(&priv->cancellable);
}
if (priv->conn != NULL) {
purple_gio_graceful_close(priv->conn,
G_INPUT_STREAM(priv->input),
G_OUTPUT_STREAM(priv->output));
g_clear_object(&priv->input);
g_clear_object(&priv->output);
g_clear_object(&priv->conn);
}
priv->connected = FALSE;
g_byte_array_set_size(priv->rbuf, 0);
}
static void
fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix)
{
if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
/* Return as cancelled means the connection is closing */
g_error_free(err);
return;
}
/* Now we can check for programming errors */
g_return_if_fail(FB_IS_MQTT(mqtt));
if (prefix != NULL) {
g_prefix_error(&err, "%s: ", prefix);
}
g_signal_emit_by_name(mqtt, "error", err);
g_error_free(err);
}
static void
fb_mqtt_error_literal(FbMqtt *mqtt, FbMqttError error, const gchar *msg)
{
GError *err;
g_return_if_fail(FB_IS_MQTT(mqtt));
err = g_error_new_literal(FB_MQTT_ERROR, error, msg);
g_signal_emit_by_name(mqtt, "error", err);
g_error_free(err);
}
void
fb_mqtt_error(FbMqtt *mqtt, FbMqttError error, const gchar *format, ...)
{
GError *err;
va_list ap;
g_return_if_fail(FB_IS_MQTT(mqtt));
va_start(ap, format);
err = g_error_new_valist(FB_MQTT_ERROR, error, format, ap);
va_end(ap);
g_signal_emit_by_name(mqtt, "error", err);
g_error_free(err);
}
static gboolean
fb_mqtt_cb_timeout(gpointer data)
{
FbMqtt *mqtt = data;
FbMqttPrivate *priv = mqtt->priv;
priv->tev = 0;
fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
_("Connection timed out"));
return FALSE;
}
static void
fb_mqtt_timeout_clear(FbMqtt *mqtt)
{
FbMqttPrivate *priv = mqtt->priv;
if (priv->tev > 0) {
g_source_remove(priv->tev);
priv->tev = 0;
}
}
static void
fb_mqtt_timeout(FbMqtt *mqtt)
{
FbMqttPrivate *priv = mqtt->priv;
fb_mqtt_timeout_clear(mqtt);
priv->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_CONN, fb_mqtt_cb_timeout,
mqtt);
}
static gboolean
fb_mqtt_cb_ping(gpointer data)
{
FbMqtt *mqtt = data;
FbMqttMessage *msg;
FbMqttPrivate *priv = mqtt->priv;
msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ, 0);
fb_mqtt_write(mqtt, msg);
g_object_unref(msg);
priv->tev = 0;
fb_mqtt_timeout(mqtt);
return FALSE;
}
static void
fb_mqtt_ping(FbMqtt *mqtt)
{
FbMqttPrivate *priv = mqtt->priv;
fb_mqtt_timeout_clear(mqtt);
priv->tev =
g_timeout_add_seconds(FB_MQTT_TIMEOUT_PING, fb_mqtt_cb_ping, mqtt);
}
static void
fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data)
{
GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source);
FbMqtt *mqtt = data;
gssize ret;
GError *err = NULL;
ret = g_buffered_input_stream_fill_finish(input, res, &err);
if (ret < 1) {
if (ret == 0) {
err = g_error_new_literal(G_IO_ERROR,
G_IO_ERROR_CONNECTION_CLOSED,
_("Connection closed"));
}
fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header"));
return;
}
fb_mqtt_read_packet(mqtt);
}
static void
fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data)
{
FbMqtt *mqtt = data;
FbMqttPrivate *priv;
gssize ret;
FbMqttMessage *msg;
GError *err = NULL;
ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err);
if (ret < 1) {
if (ret == 0) {
err = g_error_new_literal(G_IO_ERROR,
G_IO_ERROR_CONNECTION_CLOSED,
_("Connection closed"));
}
fb_mqtt_take_error(mqtt, err, _("Failed to read packet data"));
return;
}
priv = mqtt->priv;
priv->remz -= ret;
if (priv->remz > 0) {
g_input_stream_read_async(G_INPUT_STREAM(source),
priv->rbuf->data +
priv->rbuf->len - priv->remz, priv->remz,
G_PRIORITY_DEFAULT, priv->cancellable,
fb_mqtt_cb_read_packet, mqtt);
return;
}
msg = fb_mqtt_message_new_bytes(priv->rbuf);
if (G_UNLIKELY(msg == NULL)) {
fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
_("Failed to parse message"));
return;
}
fb_mqtt_read(mqtt, msg);
g_object_unref(msg);
/* Read another packet if connection wasn't reset in fb_mqtt_read() */
if (fb_mqtt_connected(mqtt, FALSE)) {
fb_mqtt_read_packet(mqtt);
}
}
static void
fb_mqtt_read_packet(FbMqtt *mqtt)
{
FbMqttPrivate *priv = mqtt->priv;
const guint8 *buf;
gsize count = 0;
gsize pos;
guint mult = 1;
guint8 byte;
gsize size = 0;
buf = g_buffered_input_stream_peek_buffer(priv->input, &count);
/* Start at 1 to skip the first byte */
pos = 1;
do {
if (pos >= count) {
/* Not enough data yet, try again later */
g_buffered_input_stream_fill_async(priv->input, -1,
G_PRIORITY_DEFAULT, priv->cancellable,
fb_mqtt_cb_fill, mqtt);
return;
}
byte = *(buf + pos++);
size += (byte & 127) * mult;
mult *= 128;
} while ((byte & 128) != 0);
/* Add header to size */
size += pos;
g_byte_array_set_size(priv->rbuf, size);
priv->remz = size;
/* TODO: Use g_input_stream_read_all_async() when available. */
/* TODO: Alternately, it would be nice to let the
* FbMqttMessage directly use the GBufferedInputStream
* buffer instead of copying it, provided it's consumed
* before the next read.
*/
g_input_stream_read_async(G_INPUT_STREAM(priv->input),
priv->rbuf->data, priv->rbuf->len,
G_PRIORITY_DEFAULT, priv->cancellable,
fb_mqtt_cb_read_packet, mqtt);
}
void
fb_mqtt_read(FbMqtt *mqtt, FbMqttMessage *msg)
{
FbMqttMessage *nsg;
FbMqttPrivate *priv;
FbMqttMessagePrivate *mriv;
GByteArray *wytes;
gchar *str;
guint8 chr;
guint16 mid;
g_return_if_fail(FB_IS_MQTT(mqtt));
g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
priv = mqtt->priv;
mriv = msg->priv;
fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
"Reading %d (flags: 0x%0X)",
mriv->type, mriv->flags);
switch (mriv->type) {
case FB_MQTT_MESSAGE_TYPE_CONNACK:
if (!fb_mqtt_message_read_byte(msg, NULL) ||
!fb_mqtt_message_read_byte(msg, &chr))
{
break;
}
if (chr != FB_MQTT_ERROR_SUCCESS) {
fb_mqtt_error(mqtt, chr, _("Connection failed (%u)"),
chr);
return;
}
priv->connected = TRUE;
fb_mqtt_ping(mqtt);
g_signal_emit_by_name(mqtt, "connect");
return;
case FB_MQTT_MESSAGE_TYPE_PUBLISH:
if (!fb_mqtt_message_read_str(msg, &str)) {
break;
}
if ((mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) ||
(mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS2))
{
if (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) {
chr = FB_MQTT_MESSAGE_TYPE_PUBACK;
} else {
chr = FB_MQTT_MESSAGE_TYPE_PUBREC;
}
if (!fb_mqtt_message_read_mid(msg, &mid)) {
g_free(str);
break;
}
nsg = fb_mqtt_message_new(chr, 0);
fb_mqtt_message_write_u16(nsg, mid);
fb_mqtt_write(mqtt, nsg);
g_object_unref(nsg);
}
wytes = g_byte_array_new();
fb_mqtt_message_read_r(msg, wytes);
g_signal_emit_by_name(mqtt, "publish", str, wytes);
g_byte_array_free(wytes, TRUE);
g_free(str);
return;
case FB_MQTT_MESSAGE_TYPE_PUBREL:
if (!fb_mqtt_message_read_mid(msg, &mid)) {
break;
}
nsg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBCOMP, 0);
fb_mqtt_message_write_u16(nsg, mid); /* Message identifier */
fb_mqtt_write(mqtt, nsg);
g_object_unref(nsg);
return;
case FB_MQTT_MESSAGE_TYPE_PINGRESP:
fb_mqtt_ping(mqtt);
return;
case FB_MQTT_MESSAGE_TYPE_PUBACK:
case FB_MQTT_MESSAGE_TYPE_PUBCOMP:
case FB_MQTT_MESSAGE_TYPE_SUBACK:
case FB_MQTT_MESSAGE_TYPE_UNSUBACK:
return;
default:
fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
_("Unknown packet (%u)"), mriv->type);
return;
}
/* Since no case returned, there was a parse error. */
fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
_("Failed to parse message"));
}
static void
fb_mqtt_cb_push_bytes(GObject *source, GAsyncResult *res, gpointer data)
{
PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(source);
FbMqtt *mqtt = data;
GError *err = NULL;
if (!purple_queued_output_stream_push_bytes_finish(stream,
res, &err)) {
purple_queued_output_stream_clear_queue(stream);
fb_mqtt_take_error(mqtt, err, _("Failed to write data"));
return;
}
}
void
fb_mqtt_write(FbMqtt *mqtt, FbMqttMessage *msg)
{
const GByteArray *bytes;
FbMqttMessagePrivate *mriv;
FbMqttPrivate *priv;
GBytes *gbytes;
g_return_if_fail(FB_IS_MQTT(mqtt));
g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
priv = mqtt->priv;
mriv = msg->priv;
bytes = fb_mqtt_message_bytes(msg);
if (G_UNLIKELY(bytes == NULL)) {
fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
_("Failed to format data"));
return;
}
fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
"Writing %d (flags: 0x%0X)",
mriv->type, mriv->flags);
/* TODO: Would be nice to refactor this to not require copying bytes */
gbytes = g_bytes_new(bytes->data, bytes->len);
purple_queued_output_stream_push_bytes_async(priv->output, gbytes,
G_PRIORITY_DEFAULT, priv->cancellable,
fb_mqtt_cb_push_bytes, mqtt);
g_bytes_unref(gbytes);
}
static void
fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data)
{
FbMqtt *mqtt = data;
FbMqttPrivate *priv;
GSocketConnection *conn;
GError *err = NULL;
conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source),
res, &err);
if (conn == NULL) {
fb_mqtt_take_error(mqtt, err, NULL);
return;
}
fb_mqtt_timeout_clear(mqtt);
priv = mqtt->priv;
priv->conn = G_IO_STREAM(conn);
priv->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new(
g_io_stream_get_input_stream(priv->conn)));
priv->output = purple_queued_output_stream_new(
g_io_stream_get_output_stream(priv->conn));
fb_mqtt_read_packet(mqtt);
g_signal_emit_by_name(mqtt, "open");
}
void
fb_mqtt_open(FbMqtt *mqtt, const gchar *host, gint port)
{
FbMqttPrivate *priv;
PurpleAccount *acc;
GSocketClient *client;
GError *err = NULL;
g_return_if_fail(FB_IS_MQTT(mqtt));
priv = mqtt->priv;
acc = purple_connection_get_account(priv->gc);
fb_mqtt_close(mqtt);
client = purple_gio_socket_client_new(acc, &err);
if (client == NULL) {
fb_mqtt_take_error(mqtt, err, NULL);
return;
}
priv->cancellable = g_cancellable_new();
g_socket_client_set_tls(client, TRUE);
g_socket_client_connect_to_host_async(client, host, port,
priv->cancellable, fb_mqtt_cb_open, mqtt);
g_object_unref(client);
fb_mqtt_timeout(mqtt);
}
void
fb_mqtt_connect(FbMqtt *mqtt, guint8 flags, const GByteArray *pload)
{
FbMqttMessage *msg;
g_return_if_fail(!fb_mqtt_connected(mqtt, FALSE));
g_return_if_fail(pload != NULL);
/* Facebook always sends a CONNACK, use QoS1 */
flags |= FB_MQTT_CONNECT_FLAG_QOS1;
msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_CONNECT, 0);
fb_mqtt_message_write_str(msg, FB_MQTT_NAME); /* Protocol name */
fb_mqtt_message_write_byte(msg, FB_MQTT_LEVEL); /* Protocol level */
fb_mqtt_message_write_byte(msg, flags); /* Flags */
fb_mqtt_message_write_u16(msg, FB_MQTT_KA); /* Keep alive */
fb_mqtt_message_write(msg, pload->data, pload->len);
fb_mqtt_write(mqtt, msg);
fb_mqtt_timeout(mqtt);
g_object_unref(msg);
}
gboolean
fb_mqtt_connected(FbMqtt *mqtt, gboolean error)
{
FbMqttPrivate *priv;
gboolean connected;
g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
priv = mqtt->priv;
connected = (priv->conn != NULL) && priv->connected;
if (!connected && error) {
fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Not connected"));
}
return connected;
}
void
fb_mqtt_disconnect(FbMqtt *mqtt)
{
FbMqttMessage *msg;
if (G_UNLIKELY(!fb_mqtt_connected(mqtt, FALSE))) {
return;
}
msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_DISCONNECT, 0);
fb_mqtt_write(mqtt, msg);
g_object_unref(msg);
fb_mqtt_close(mqtt);
}
void
fb_mqtt_publish(FbMqtt *mqtt, const gchar *topic, const GByteArray *pload)
{
FbMqttMessage *msg;
FbMqttPrivate *priv;
g_return_if_fail(FB_IS_MQTT(mqtt));
g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
priv = mqtt->priv;
/* Message identifier not required, but for consistency use QoS1 */
msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH,
FB_MQTT_MESSAGE_FLAG_QOS1);
fb_mqtt_message_write_str(msg, topic); /* Message topic */
fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
if (pload != NULL) {
fb_mqtt_message_write(msg, pload->data, pload->len);
}
fb_mqtt_write(mqtt, msg);
g_object_unref(msg);
}
void
fb_mqtt_subscribe(FbMqtt *mqtt, ...)
{
const gchar *topic;
FbMqttMessage *msg;
FbMqttPrivate *priv;
guint16 qos;
va_list ap;
g_return_if_fail(FB_IS_MQTT(mqtt));
g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
priv = mqtt->priv;
/* Facebook requires a message identifier, use QoS1 */
msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE,
FB_MQTT_MESSAGE_FLAG_QOS1);
fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
va_start(ap, mqtt);
while ((topic = va_arg(ap, const gchar*)) != NULL) {
qos = va_arg(ap, guint);
fb_mqtt_message_write_str(msg, topic);
fb_mqtt_message_write_byte(msg, qos);
}
va_end(ap);
fb_mqtt_write(mqtt, msg);
g_object_unref(msg);
}
void
fb_mqtt_unsubscribe(FbMqtt *mqtt, const gchar *topic1, ...)
{
const gchar *topic;
FbMqttMessage *msg;
FbMqttPrivate *priv;
va_list ap;
g_return_if_fail(FB_IS_MQTT(mqtt));
g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
priv = mqtt->priv;
/* Facebook requires a message identifier, use QoS1 */
msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE,
FB_MQTT_MESSAGE_FLAG_QOS1);
fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
fb_mqtt_message_write_str(msg, topic1); /* First topic */
va_start(ap, topic1);
while ((topic = va_arg(ap, const gchar*)) != NULL) {
fb_mqtt_message_write_str(msg, topic); /* Remaining topics */
}
va_end(ap);
fb_mqtt_write(mqtt, msg);
g_object_unref(msg);
}
FbMqttMessage *
fb_mqtt_message_new(FbMqttMessageType type, FbMqttMessageFlags flags)
{
FbMqttMessage *msg;
FbMqttMessagePrivate *priv;
msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
priv = msg->priv;
priv->type = type;
priv->flags = flags;
priv->bytes = g_byte_array_new();
priv->local = TRUE;
return msg;
}
FbMqttMessage *
fb_mqtt_message_new_bytes(GByteArray *bytes)
{
FbMqttMessage *msg;
FbMqttMessagePrivate *priv;
guint8 *byte;
g_return_val_if_fail(bytes != NULL, NULL);
g_return_val_if_fail(bytes->len >= 2, NULL);
msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
priv = msg->priv;
priv->bytes = bytes;
priv->local = FALSE;
priv->type = (*bytes->data & 0xF0) >> 4;
priv->flags = *bytes->data & 0x0F;
/* Skip the fixed header */
for (byte = priv->bytes->data + 1; (*(byte++) & 128) != 0; );
priv->offset = byte - bytes->data;
priv->pos = priv->offset;
return msg;
}
void
fb_mqtt_message_reset(FbMqttMessage *msg)
{
FbMqttMessagePrivate *priv;
g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
priv = msg->priv;
if (priv->offset > 0) {
g_byte_array_remove_range(priv->bytes, 0, priv->offset);
priv->offset = 0;
priv->pos = 0;
}
}
const GByteArray *
fb_mqtt_message_bytes(FbMqttMessage *msg)
{
FbMqttMessagePrivate *priv;
guint i;
guint8 byte;
guint8 sbuf[4];
guint32 size;
g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), NULL);
priv = msg->priv;
i = 0;
size = priv->bytes->len - priv->offset;
do {
if (G_UNLIKELY(i >= G_N_ELEMENTS(sbuf))) {
return NULL;
}
byte = size % 128;
size /= 128;
if (size > 0) {
byte |= 128;
}
sbuf[i++] = byte;
} while (size > 0);
fb_mqtt_message_reset(msg);
g_byte_array_prepend(priv->bytes, sbuf, i);
byte = ((priv->type & 0x0F) << 4) | (priv->flags & 0x0F);
g_byte_array_prepend(priv->bytes, &byte, sizeof byte);
priv->pos = (i + 1) * (sizeof byte);
return priv->bytes;
}
gboolean
fb_mqtt_message_read(FbMqttMessage *msg, gpointer data, guint size)
{
FbMqttMessagePrivate *priv;
g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
priv = msg->priv;
if ((priv->pos + size) > priv->bytes->len) {
return FALSE;
}
if ((data != NULL) && (size > 0)) {
memcpy(data, priv->bytes->data + priv->pos, size);
}
priv->pos += size;
return TRUE;
}
gboolean
fb_mqtt_message_read_r(FbMqttMessage *msg, GByteArray *bytes)
{
FbMqttMessagePrivate *priv;
guint size;
g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
priv = msg->priv;
size = priv->bytes->len - priv->pos;
if (G_LIKELY(size > 0)) {
g_byte_array_append(bytes, priv->bytes->data + priv->pos,
size);
}
return TRUE;
}
gboolean
fb_mqtt_message_read_byte(FbMqttMessage *msg, guint8 *value)
{
return fb_mqtt_message_read(msg, value, sizeof *value);
}
gboolean
fb_mqtt_message_read_mid(FbMqttMessage *msg, guint16 *value)
{
return fb_mqtt_message_read_u16(msg, value);
}
gboolean
fb_mqtt_message_read_u16(FbMqttMessage *msg, guint16 *value)
{
if (!fb_mqtt_message_read(msg, value, sizeof *value)) {
return FALSE;
}
if (value != NULL) {
*value = g_ntohs(*value);
}
return TRUE;
}
gboolean
fb_mqtt_message_read_str(FbMqttMessage *msg, gchar **value)
{
guint8 *data;
guint16 size;
if (!fb_mqtt_message_read_u16(msg, &size)) {
return FALSE;
}
if (value != NULL) {
data = g_new(guint8, size + 1);
data[size] = 0;
} else {
data = NULL;
}
if (!fb_mqtt_message_read(msg, data, size)) {
g_free(data);
return FALSE;
}
if (value != NULL) {
*value = (gchar *) data;
}
return TRUE;
}
void
fb_mqtt_message_write(FbMqttMessage *msg, gconstpointer data, guint size)
{
FbMqttMessagePrivate *priv;
g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
priv = msg->priv;
g_byte_array_append(priv->bytes, data, size);
priv->pos += size;
}
void
fb_mqtt_message_write_byte(FbMqttMessage *msg, guint8 value)
{
fb_mqtt_message_write(msg, &value, sizeof value);
}
void
fb_mqtt_message_write_mid(FbMqttMessage *msg, guint16 *value)
{
g_return_if_fail(value != NULL);
fb_mqtt_message_write_u16(msg, ++(*value));
}
void
fb_mqtt_message_write_u16(FbMqttMessage *msg, guint16 value)
{
value = g_htons(value);
fb_mqtt_message_write(msg, &value, sizeof value);
}
void
fb_mqtt_message_write_str(FbMqttMessage *msg, const gchar *value)
{
gint16 size;
g_return_if_fail(value != NULL);
size = strlen(value);
fb_mqtt_message_write_u16(msg, size);
fb_mqtt_message_write(msg, value, size);
}