pidgin/pidgin
Clone
Summary
Browse
Changes
Graph
Route GLib debug logging directly to the Finch debug window
2021-10-18, Elliott Sales de Andrade
1896a80ff8e3
Route GLib debug logging directly to the Finch debug window
Instead of flowing through purple debug, this merges some bits of the existing GLib log handler, and the purple debug printer.
Testing Done:
Open the Debug window an see some `GLib-*` outputs.
Reviewed at https://reviews.imfreedom.org/r/1057/
/* purple
*
* Purple is the legal property of its developers, whose names are too numerous
* to list here. Please refer to the COPYRIGHT file distributed with this
* source distribution.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
*/
#include
<glib/gi18n-lib.h>
#include
<glib/gprintf.h>
#include
<stdarg.h>
#include
<string.h>
#include
<purple.h>
#include
"mqtt.h"
#include
"util.h"
typedef
struct
{
PurpleConnection
*
gc
;
GIOStream
*
conn
;
GBufferedInputStream
*
input
;
PurpleQueuedOutputStream
*
output
;
GCancellable
*
cancellable
;
gboolean
connected
;
guint16
mid
;
GByteArray
*
rbuf
;
gsize
remz
;
gint
tev
;
}
FbMqttPrivate
;
/**
* FbMqtt:
*
* Represents an MQTT connection.
*/
struct
_FbMqtt
{
GObject
parent
;
FbMqttPrivate
*
priv
;
};
G_DEFINE_TYPE_WITH_PRIVATE
(
FbMqtt
,
fb_mqtt
,
G_TYPE_OBJECT
);
typedef
struct
{
FbMqttMessageType
type
;
FbMqttMessageFlags
flags
;
GByteArray
*
bytes
;
guint
offset
;
guint
pos
;
gboolean
local
;
}
FbMqttMessagePrivate
;
/**
* FbMqttMessage:
*
* Represents a reader/writer for an MQTT message.
*/
struct
_FbMqttMessage
{
GObject
parent
;
FbMqttMessagePrivate
*
priv
;
};
G_DEFINE_TYPE_WITH_PRIVATE
(
FbMqttMessage
,
fb_mqtt_message
,
G_TYPE_OBJECT
);
static
void
fb_mqtt_read_packet
(
FbMqtt
*
mqtt
);
static
void
fb_mqtt_dispose
(
GObject
*
obj
)
{
FbMqtt
*
mqtt
=
FB_MQTT
(
obj
);
FbMqttPrivate
*
priv
=
mqtt
->
priv
;
fb_mqtt_close
(
mqtt
);
g_byte_array_free
(
priv
->
rbuf
,
TRUE
);
}
static
void
fb_mqtt_class_init
(
FbMqttClass
*
klass
)
{
GObjectClass
*
gklass
=
G_OBJECT_CLASS
(
klass
);
gklass
->
dispose
=
fb_mqtt_dispose
;
/**
* FbMqtt::connect:
* @mqtt: The #FbMqtt.
*
* Emitted upon the successful completion of the connection
* process. This is emitted as a result of #fb_mqtt_connect().
*/
g_signal_new
(
"connect"
,
G_TYPE_FROM_CLASS
(
klass
),
G_SIGNAL_ACTION
,
0
,
NULL
,
NULL
,
NULL
,
G_TYPE_NONE
,
0
);
/**
* FbMqtt::error:
* @mqtt: The #FbMqtt.
* @error: The #GError.
*
* Emitted whenever an error is hit within the #FbMqtt. This
* should close the #FbMqtt with #fb_mqtt_close().
*/
g_signal_new
(
"error"
,
G_TYPE_FROM_CLASS
(
klass
),
G_SIGNAL_ACTION
,
0
,
NULL
,
NULL
,
NULL
,
G_TYPE_NONE
,
1
,
G_TYPE_ERROR
);
/**
* FbMqtt::open:
* @mqtt: The #FbMqtt.
*
* Emitted upon the successful opening of the remote socket.
* This is emitted as a result of #fb_mqtt_open(). This should
* call #fb_mqtt_connect().
*/
g_signal_new
(
"open"
,
G_TYPE_FROM_CLASS
(
klass
),
G_SIGNAL_ACTION
,
0
,
NULL
,
NULL
,
NULL
,
G_TYPE_NONE
,
0
);
/**
* FbMqtt::publish:
* @mqtt: The #FbMqtt.
* @topic: The topic.
* @pload: The payload.
*
* Emitted upon an incoming message from the steam.
*/
g_signal_new
(
"publish"
,
G_TYPE_FROM_CLASS
(
klass
),
G_SIGNAL_ACTION
,
0
,
NULL
,
NULL
,
NULL
,
G_TYPE_NONE
,
2
,
G_TYPE_STRING
,
G_TYPE_BYTE_ARRAY
);
}
static
void
fb_mqtt_init
(
FbMqtt
*
mqtt
)
{
FbMqttPrivate
*
priv
=
fb_mqtt_get_instance_private
(
mqtt
);
mqtt
->
priv
=
priv
;
priv
->
rbuf
=
g_byte_array_new
();
}
static
void
fb_mqtt_message_dispose
(
GObject
*
obj
)
{
FbMqttMessagePrivate
*
priv
=
FB_MQTT_MESSAGE
(
obj
)
->
priv
;
if
((
priv
->
bytes
!=
NULL
)
&&
priv
->
local
)
{
g_byte_array_free
(
priv
->
bytes
,
TRUE
);
}
}
static
void
fb_mqtt_message_class_init
(
FbMqttMessageClass
*
klass
)
{
GObjectClass
*
gklass
=
G_OBJECT_CLASS
(
klass
);
gklass
->
dispose
=
fb_mqtt_message_dispose
;
}
static
void
fb_mqtt_message_init
(
FbMqttMessage
*
msg
)
{
FbMqttMessagePrivate
*
priv
=
fb_mqtt_message_get_instance_private
(
msg
);
msg
->
priv
=
priv
;
}
GQuark
fb_mqtt_error_quark
(
void
)
{
static
GQuark
q
=
0
;
if
(
G_UNLIKELY
(
q
==
0
))
{
q
=
g_quark_from_static_string
(
"fb-mqtt-error-quark"
);
}
return
q
;
}
FbMqtt
*
fb_mqtt_new
(
PurpleConnection
*
gc
)
{
FbMqtt
*
mqtt
;
FbMqttPrivate
*
priv
;
g_return_val_if_fail
(
PURPLE_IS_CONNECTION
(
gc
),
NULL
);
mqtt
=
g_object_new
(
FB_TYPE_MQTT
,
NULL
);
priv
=
mqtt
->
priv
;
priv
->
gc
=
gc
;
return
mqtt
;
};
void
fb_mqtt_close
(
FbMqtt
*
mqtt
)
{
FbMqttPrivate
*
priv
;
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
priv
=
mqtt
->
priv
;
if
(
priv
->
tev
>
0
)
{
g_source_remove
(
priv
->
tev
);
priv
->
tev
=
0
;
}
if
(
priv
->
cancellable
!=
NULL
)
{
g_cancellable_cancel
(
priv
->
cancellable
);
g_clear_object
(
&
priv
->
cancellable
);
}
if
(
priv
->
conn
!=
NULL
)
{
purple_gio_graceful_close
(
priv
->
conn
,
G_INPUT_STREAM
(
priv
->
input
),
G_OUTPUT_STREAM
(
priv
->
output
));
g_clear_object
(
&
priv
->
input
);
g_clear_object
(
&
priv
->
output
);
g_clear_object
(
&
priv
->
conn
);
}
priv
->
connected
=
FALSE
;
g_byte_array_set_size
(
priv
->
rbuf
,
0
);
}
static
void
fb_mqtt_take_error
(
FbMqtt
*
mqtt
,
GError
*
err
,
const
gchar
*
prefix
)
{
if
(
g_error_matches
(
err
,
G_IO_ERROR
,
G_IO_ERROR_CANCELLED
))
{
/* Return as cancelled means the connection is closing */
g_error_free
(
err
);
return
;
}
/* Now we can check for programming errors */
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
if
(
prefix
!=
NULL
)
{
g_prefix_error
(
&
err
,
"%s: "
,
prefix
);
}
g_signal_emit_by_name
(
mqtt
,
"error"
,
err
);
g_error_free
(
err
);
}
static
void
fb_mqtt_error_literal
(
FbMqtt
*
mqtt
,
FbMqttError
error
,
const
gchar
*
msg
)
{
GError
*
err
;
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
err
=
g_error_new_literal
(
FB_MQTT_ERROR
,
error
,
msg
);
g_signal_emit_by_name
(
mqtt
,
"error"
,
err
);
g_error_free
(
err
);
}
void
fb_mqtt_error
(
FbMqtt
*
mqtt
,
FbMqttError
error
,
const
gchar
*
format
,
...)
{
GError
*
err
;
va_list
ap
;
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
va_start
(
ap
,
format
);
err
=
g_error_new_valist
(
FB_MQTT_ERROR
,
error
,
format
,
ap
);
va_end
(
ap
);
g_signal_emit_by_name
(
mqtt
,
"error"
,
err
);
g_error_free
(
err
);
}
static
gboolean
fb_mqtt_cb_timeout
(
gpointer
data
)
{
FbMqtt
*
mqtt
=
data
;
FbMqttPrivate
*
priv
=
mqtt
->
priv
;
priv
->
tev
=
0
;
fb_mqtt_error_literal
(
mqtt
,
FB_MQTT_ERROR_GENERAL
,
_
(
"Connection timed out"
));
return
FALSE
;
}
static
void
fb_mqtt_timeout_clear
(
FbMqtt
*
mqtt
)
{
FbMqttPrivate
*
priv
=
mqtt
->
priv
;
if
(
priv
->
tev
>
0
)
{
g_source_remove
(
priv
->
tev
);
priv
->
tev
=
0
;
}
}
static
void
fb_mqtt_timeout
(
FbMqtt
*
mqtt
)
{
FbMqttPrivate
*
priv
=
mqtt
->
priv
;
fb_mqtt_timeout_clear
(
mqtt
);
priv
->
tev
=
g_timeout_add_seconds
(
FB_MQTT_TIMEOUT_CONN
,
fb_mqtt_cb_timeout
,
mqtt
);
}
static
gboolean
fb_mqtt_cb_ping
(
gpointer
data
)
{
FbMqtt
*
mqtt
=
data
;
FbMqttMessage
*
msg
;
FbMqttPrivate
*
priv
=
mqtt
->
priv
;
msg
=
fb_mqtt_message_new
(
FB_MQTT_MESSAGE_TYPE_PINGREQ
,
0
);
fb_mqtt_write
(
mqtt
,
msg
);
g_object_unref
(
msg
);
priv
->
tev
=
0
;
fb_mqtt_timeout
(
mqtt
);
return
FALSE
;
}
static
void
fb_mqtt_ping
(
FbMqtt
*
mqtt
)
{
FbMqttPrivate
*
priv
=
mqtt
->
priv
;
fb_mqtt_timeout_clear
(
mqtt
);
priv
->
tev
=
g_timeout_add_seconds
(
FB_MQTT_TIMEOUT_PING
,
fb_mqtt_cb_ping
,
mqtt
);
}
static
void
fb_mqtt_cb_fill
(
GObject
*
source
,
GAsyncResult
*
res
,
gpointer
data
)
{
GBufferedInputStream
*
input
=
G_BUFFERED_INPUT_STREAM
(
source
);
FbMqtt
*
mqtt
=
data
;
gssize
ret
;
GError
*
err
=
NULL
;
ret
=
g_buffered_input_stream_fill_finish
(
input
,
res
,
&
err
);
if
(
ret
<
1
)
{
if
(
ret
==
0
)
{
err
=
g_error_new_literal
(
G_IO_ERROR
,
G_IO_ERROR_CONNECTION_CLOSED
,
_
(
"Connection closed"
));
}
fb_mqtt_take_error
(
mqtt
,
err
,
_
(
"Failed to read fixed header"
));
return
;
}
fb_mqtt_read_packet
(
mqtt
);
}
static
void
fb_mqtt_cb_read_packet
(
GObject
*
source
,
GAsyncResult
*
res
,
gpointer
data
)
{
FbMqtt
*
mqtt
=
data
;
FbMqttPrivate
*
priv
;
gssize
ret
;
FbMqttMessage
*
msg
;
GError
*
err
=
NULL
;
ret
=
g_input_stream_read_finish
(
G_INPUT_STREAM
(
source
),
res
,
&
err
);
if
(
ret
<
1
)
{
if
(
ret
==
0
)
{
err
=
g_error_new_literal
(
G_IO_ERROR
,
G_IO_ERROR_CONNECTION_CLOSED
,
_
(
"Connection closed"
));
}
fb_mqtt_take_error
(
mqtt
,
err
,
_
(
"Failed to read packet data"
));
return
;
}
priv
=
mqtt
->
priv
;
priv
->
remz
-=
ret
;
if
(
priv
->
remz
>
0
)
{
g_input_stream_read_async
(
G_INPUT_STREAM
(
source
),
priv
->
rbuf
->
data
+
priv
->
rbuf
->
len
-
priv
->
remz
,
priv
->
remz
,
G_PRIORITY_DEFAULT
,
priv
->
cancellable
,
fb_mqtt_cb_read_packet
,
mqtt
);
return
;
}
msg
=
fb_mqtt_message_new_bytes
(
priv
->
rbuf
);
if
(
G_UNLIKELY
(
msg
==
NULL
))
{
fb_mqtt_error_literal
(
mqtt
,
FB_MQTT_ERROR_GENERAL
,
_
(
"Failed to parse message"
));
return
;
}
fb_mqtt_read
(
mqtt
,
msg
);
g_object_unref
(
msg
);
/* Read another packet if connection wasn't reset in fb_mqtt_read() */
if
(
fb_mqtt_connected
(
mqtt
,
FALSE
))
{
fb_mqtt_read_packet
(
mqtt
);
}
}
static
void
fb_mqtt_read_packet
(
FbMqtt
*
mqtt
)
{
FbMqttPrivate
*
priv
=
mqtt
->
priv
;
const
guint8
*
buf
;
gsize
count
=
0
;
gsize
pos
;
guint
mult
=
1
;
guint8
byte
;
gsize
size
=
0
;
buf
=
g_buffered_input_stream_peek_buffer
(
priv
->
input
,
&
count
);
/* Start at 1 to skip the first byte */
pos
=
1
;
do
{
if
(
pos
>=
count
)
{
/* Not enough data yet, try again later */
g_buffered_input_stream_fill_async
(
priv
->
input
,
-1
,
G_PRIORITY_DEFAULT
,
priv
->
cancellable
,
fb_mqtt_cb_fill
,
mqtt
);
return
;
}
byte
=
*
(
buf
+
pos
++
);
size
+=
(
byte
&
127
)
*
mult
;
mult
*=
128
;
}
while
((
byte
&
128
)
!=
0
);
/* Add header to size */
size
+=
pos
;
g_byte_array_set_size
(
priv
->
rbuf
,
size
);
priv
->
remz
=
size
;
/* TODO: Use g_input_stream_read_all_async() when available. */
/* TODO: Alternately, it would be nice to let the
* FbMqttMessage directly use the GBufferedInputStream
* buffer instead of copying it, provided it's consumed
* before the next read.
*/
g_input_stream_read_async
(
G_INPUT_STREAM
(
priv
->
input
),
priv
->
rbuf
->
data
,
priv
->
rbuf
->
len
,
G_PRIORITY_DEFAULT
,
priv
->
cancellable
,
fb_mqtt_cb_read_packet
,
mqtt
);
}
void
fb_mqtt_read
(
FbMqtt
*
mqtt
,
FbMqttMessage
*
msg
)
{
FbMqttMessage
*
nsg
;
FbMqttPrivate
*
priv
;
FbMqttMessagePrivate
*
mriv
;
GByteArray
*
wytes
;
gchar
*
str
;
guint8
chr
;
guint16
mid
;
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
g_return_if_fail
(
FB_IS_MQTT_MESSAGE
(
msg
));
priv
=
mqtt
->
priv
;
mriv
=
msg
->
priv
;
fb_util_debug_hexdump
(
FB_UTIL_DEBUG_INFO
,
mriv
->
bytes
,
"Reading %d (flags: 0x%0X)"
,
mriv
->
type
,
mriv
->
flags
);
switch
(
mriv
->
type
)
{
case
FB_MQTT_MESSAGE_TYPE_CONNACK
:
if
(
!
fb_mqtt_message_read_byte
(
msg
,
NULL
)
||
!
fb_mqtt_message_read_byte
(
msg
,
&
chr
))
{
break
;
}
if
(
chr
!=
FB_MQTT_ERROR_SUCCESS
)
{
fb_mqtt_error
(
mqtt
,
chr
,
_
(
"Connection failed (%u)"
),
chr
);
return
;
}
priv
->
connected
=
TRUE
;
fb_mqtt_ping
(
mqtt
);
g_signal_emit_by_name
(
mqtt
,
"connect"
);
return
;
case
FB_MQTT_MESSAGE_TYPE_PUBLISH
:
if
(
!
fb_mqtt_message_read_str
(
msg
,
&
str
))
{
break
;
}
if
((
mriv
->
flags
&
FB_MQTT_MESSAGE_FLAG_QOS1
)
||
(
mriv
->
flags
&
FB_MQTT_MESSAGE_FLAG_QOS2
))
{
if
(
mriv
->
flags
&
FB_MQTT_MESSAGE_FLAG_QOS1
)
{
chr
=
FB_MQTT_MESSAGE_TYPE_PUBACK
;
}
else
{
chr
=
FB_MQTT_MESSAGE_TYPE_PUBREC
;
}
if
(
!
fb_mqtt_message_read_mid
(
msg
,
&
mid
))
{
g_free
(
str
);
break
;
}
nsg
=
fb_mqtt_message_new
(
chr
,
0
);
fb_mqtt_message_write_u16
(
nsg
,
mid
);
fb_mqtt_write
(
mqtt
,
nsg
);
g_object_unref
(
nsg
);
}
wytes
=
g_byte_array_new
();
fb_mqtt_message_read_r
(
msg
,
wytes
);
g_signal_emit_by_name
(
mqtt
,
"publish"
,
str
,
wytes
);
g_byte_array_free
(
wytes
,
TRUE
);
g_free
(
str
);
return
;
case
FB_MQTT_MESSAGE_TYPE_PUBREL
:
if
(
!
fb_mqtt_message_read_mid
(
msg
,
&
mid
))
{
break
;
}
nsg
=
fb_mqtt_message_new
(
FB_MQTT_MESSAGE_TYPE_PUBCOMP
,
0
);
fb_mqtt_message_write_u16
(
nsg
,
mid
);
/* Message identifier */
fb_mqtt_write
(
mqtt
,
nsg
);
g_object_unref
(
nsg
);
return
;
case
FB_MQTT_MESSAGE_TYPE_PINGRESP
:
fb_mqtt_ping
(
mqtt
);
return
;
case
FB_MQTT_MESSAGE_TYPE_PUBACK
:
case
FB_MQTT_MESSAGE_TYPE_PUBCOMP
:
case
FB_MQTT_MESSAGE_TYPE_SUBACK
:
case
FB_MQTT_MESSAGE_TYPE_UNSUBACK
:
return
;
default
:
fb_mqtt_error
(
mqtt
,
FB_MQTT_ERROR_GENERAL
,
_
(
"Unknown packet (%u)"
),
mriv
->
type
);
return
;
}
/* Since no case returned, there was a parse error. */
fb_mqtt_error_literal
(
mqtt
,
FB_MQTT_ERROR_GENERAL
,
_
(
"Failed to parse message"
));
}
static
void
fb_mqtt_cb_push_bytes
(
GObject
*
source
,
GAsyncResult
*
res
,
gpointer
data
)
{
PurpleQueuedOutputStream
*
stream
=
PURPLE_QUEUED_OUTPUT_STREAM
(
source
);
FbMqtt
*
mqtt
=
data
;
GError
*
err
=
NULL
;
if
(
!
purple_queued_output_stream_push_bytes_finish
(
stream
,
res
,
&
err
))
{
purple_queued_output_stream_clear_queue
(
stream
);
fb_mqtt_take_error
(
mqtt
,
err
,
_
(
"Failed to write data"
));
return
;
}
}
void
fb_mqtt_write
(
FbMqtt
*
mqtt
,
FbMqttMessage
*
msg
)
{
const
GByteArray
*
bytes
;
FbMqttMessagePrivate
*
mriv
;
FbMqttPrivate
*
priv
;
GBytes
*
gbytes
;
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
g_return_if_fail
(
FB_IS_MQTT_MESSAGE
(
msg
));
priv
=
mqtt
->
priv
;
mriv
=
msg
->
priv
;
bytes
=
fb_mqtt_message_bytes
(
msg
);
if
(
G_UNLIKELY
(
bytes
==
NULL
))
{
fb_mqtt_error_literal
(
mqtt
,
FB_MQTT_ERROR_GENERAL
,
_
(
"Failed to format data"
));
return
;
}
fb_util_debug_hexdump
(
FB_UTIL_DEBUG_INFO
,
mriv
->
bytes
,
"Writing %d (flags: 0x%0X)"
,
mriv
->
type
,
mriv
->
flags
);
/* TODO: Would be nice to refactor this to not require copying bytes */
gbytes
=
g_bytes_new
(
bytes
->
data
,
bytes
->
len
);
purple_queued_output_stream_push_bytes_async
(
priv
->
output
,
gbytes
,
G_PRIORITY_DEFAULT
,
priv
->
cancellable
,
fb_mqtt_cb_push_bytes
,
mqtt
);
g_bytes_unref
(
gbytes
);
}
static
void
fb_mqtt_cb_open
(
GObject
*
source
,
GAsyncResult
*
res
,
gpointer
data
)
{
FbMqtt
*
mqtt
=
data
;
FbMqttPrivate
*
priv
;
GSocketConnection
*
conn
;
GError
*
err
=
NULL
;
conn
=
g_socket_client_connect_to_host_finish
(
G_SOCKET_CLIENT
(
source
),
res
,
&
err
);
if
(
conn
==
NULL
)
{
fb_mqtt_take_error
(
mqtt
,
err
,
NULL
);
return
;
}
fb_mqtt_timeout_clear
(
mqtt
);
priv
=
mqtt
->
priv
;
priv
->
conn
=
G_IO_STREAM
(
conn
);
priv
->
input
=
G_BUFFERED_INPUT_STREAM
(
g_buffered_input_stream_new
(
g_io_stream_get_input_stream
(
priv
->
conn
)));
priv
->
output
=
purple_queued_output_stream_new
(
g_io_stream_get_output_stream
(
priv
->
conn
));
fb_mqtt_read_packet
(
mqtt
);
g_signal_emit_by_name
(
mqtt
,
"open"
);
}
void
fb_mqtt_open
(
FbMqtt
*
mqtt
,
const
gchar
*
host
,
gint
port
)
{
FbMqttPrivate
*
priv
;
PurpleAccount
*
acc
;
GSocketClient
*
client
;
GError
*
err
=
NULL
;
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
priv
=
mqtt
->
priv
;
acc
=
purple_connection_get_account
(
priv
->
gc
);
fb_mqtt_close
(
mqtt
);
client
=
purple_gio_socket_client_new
(
acc
,
&
err
);
if
(
client
==
NULL
)
{
fb_mqtt_take_error
(
mqtt
,
err
,
NULL
);
return
;
}
priv
->
cancellable
=
g_cancellable_new
();
g_socket_client_set_tls
(
client
,
TRUE
);
g_socket_client_connect_to_host_async
(
client
,
host
,
port
,
priv
->
cancellable
,
fb_mqtt_cb_open
,
mqtt
);
g_object_unref
(
client
);
fb_mqtt_timeout
(
mqtt
);
}
void
fb_mqtt_connect
(
FbMqtt
*
mqtt
,
guint8
flags
,
const
GByteArray
*
pload
)
{
FbMqttMessage
*
msg
;
g_return_if_fail
(
!
fb_mqtt_connected
(
mqtt
,
FALSE
));
g_return_if_fail
(
pload
!=
NULL
);
/* Facebook always sends a CONNACK, use QoS1 */
flags
|=
FB_MQTT_CONNECT_FLAG_QOS1
;
msg
=
fb_mqtt_message_new
(
FB_MQTT_MESSAGE_TYPE_CONNECT
,
0
);
fb_mqtt_message_write_str
(
msg
,
FB_MQTT_NAME
);
/* Protocol name */
fb_mqtt_message_write_byte
(
msg
,
FB_MQTT_LEVEL
);
/* Protocol level */
fb_mqtt_message_write_byte
(
msg
,
flags
);
/* Flags */
fb_mqtt_message_write_u16
(
msg
,
FB_MQTT_KA
);
/* Keep alive */
fb_mqtt_message_write
(
msg
,
pload
->
data
,
pload
->
len
);
fb_mqtt_write
(
mqtt
,
msg
);
fb_mqtt_timeout
(
mqtt
);
g_object_unref
(
msg
);
}
gboolean
fb_mqtt_connected
(
FbMqtt
*
mqtt
,
gboolean
error
)
{
FbMqttPrivate
*
priv
;
gboolean
connected
;
g_return_val_if_fail
(
FB_IS_MQTT
(
mqtt
),
FALSE
);
priv
=
mqtt
->
priv
;
connected
=
(
priv
->
conn
!=
NULL
)
&&
priv
->
connected
;
if
(
!
connected
&&
error
)
{
fb_mqtt_error_literal
(
mqtt
,
FB_MQTT_ERROR_GENERAL
,
_
(
"Not connected"
));
}
return
connected
;
}
void
fb_mqtt_disconnect
(
FbMqtt
*
mqtt
)
{
FbMqttMessage
*
msg
;
if
(
G_UNLIKELY
(
!
fb_mqtt_connected
(
mqtt
,
FALSE
)))
{
return
;
}
msg
=
fb_mqtt_message_new
(
FB_MQTT_MESSAGE_TYPE_DISCONNECT
,
0
);
fb_mqtt_write
(
mqtt
,
msg
);
g_object_unref
(
msg
);
fb_mqtt_close
(
mqtt
);
}
void
fb_mqtt_publish
(
FbMqtt
*
mqtt
,
const
gchar
*
topic
,
const
GByteArray
*
pload
)
{
FbMqttMessage
*
msg
;
FbMqttPrivate
*
priv
;
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
g_return_if_fail
(
fb_mqtt_connected
(
mqtt
,
FALSE
));
priv
=
mqtt
->
priv
;
/* Message identifier not required, but for consistency use QoS1 */
msg
=
fb_mqtt_message_new
(
FB_MQTT_MESSAGE_TYPE_PUBLISH
,
FB_MQTT_MESSAGE_FLAG_QOS1
);
fb_mqtt_message_write_str
(
msg
,
topic
);
/* Message topic */
fb_mqtt_message_write_mid
(
msg
,
&
priv
->
mid
);
/* Message identifier */
if
(
pload
!=
NULL
)
{
fb_mqtt_message_write
(
msg
,
pload
->
data
,
pload
->
len
);
}
fb_mqtt_write
(
mqtt
,
msg
);
g_object_unref
(
msg
);
}
void
fb_mqtt_subscribe
(
FbMqtt
*
mqtt
,
...)
{
const
gchar
*
topic
;
FbMqttMessage
*
msg
;
FbMqttPrivate
*
priv
;
guint16
qos
;
va_list
ap
;
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
g_return_if_fail
(
fb_mqtt_connected
(
mqtt
,
FALSE
));
priv
=
mqtt
->
priv
;
/* Facebook requires a message identifier, use QoS1 */
msg
=
fb_mqtt_message_new
(
FB_MQTT_MESSAGE_TYPE_SUBSCRIBE
,
FB_MQTT_MESSAGE_FLAG_QOS1
);
fb_mqtt_message_write_mid
(
msg
,
&
priv
->
mid
);
/* Message identifier */
va_start
(
ap
,
mqtt
);
while
((
topic
=
va_arg
(
ap
,
const
gchar
*
))
!=
NULL
)
{
qos
=
va_arg
(
ap
,
guint
);
fb_mqtt_message_write_str
(
msg
,
topic
);
fb_mqtt_message_write_byte
(
msg
,
qos
);
}
va_end
(
ap
);
fb_mqtt_write
(
mqtt
,
msg
);
g_object_unref
(
msg
);
}
void
fb_mqtt_unsubscribe
(
FbMqtt
*
mqtt
,
const
gchar
*
topic1
,
...)
{
const
gchar
*
topic
;
FbMqttMessage
*
msg
;
FbMqttPrivate
*
priv
;
va_list
ap
;
g_return_if_fail
(
FB_IS_MQTT
(
mqtt
));
g_return_if_fail
(
fb_mqtt_connected
(
mqtt
,
FALSE
));
priv
=
mqtt
->
priv
;
/* Facebook requires a message identifier, use QoS1 */
msg
=
fb_mqtt_message_new
(
FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE
,
FB_MQTT_MESSAGE_FLAG_QOS1
);
fb_mqtt_message_write_mid
(
msg
,
&
priv
->
mid
);
/* Message identifier */
fb_mqtt_message_write_str
(
msg
,
topic1
);
/* First topic */
va_start
(
ap
,
topic1
);
while
((
topic
=
va_arg
(
ap
,
const
gchar
*
))
!=
NULL
)
{
fb_mqtt_message_write_str
(
msg
,
topic
);
/* Remaining topics */
}
va_end
(
ap
);
fb_mqtt_write
(
mqtt
,
msg
);
g_object_unref
(
msg
);
}
FbMqttMessage
*
fb_mqtt_message_new
(
FbMqttMessageType
type
,
FbMqttMessageFlags
flags
)
{
FbMqttMessage
*
msg
;
FbMqttMessagePrivate
*
priv
;
msg
=
g_object_new
(
FB_TYPE_MQTT_MESSAGE
,
NULL
);
priv
=
msg
->
priv
;
priv
->
type
=
type
;
priv
->
flags
=
flags
;
priv
->
bytes
=
g_byte_array_new
();
priv
->
local
=
TRUE
;
return
msg
;
}
FbMqttMessage
*
fb_mqtt_message_new_bytes
(
GByteArray
*
bytes
)
{
FbMqttMessage
*
msg
;
FbMqttMessagePrivate
*
priv
;
guint8
*
byte
;
g_return_val_if_fail
(
bytes
!=
NULL
,
NULL
);
g_return_val_if_fail
(
bytes
->
len
>=
2
,
NULL
);
msg
=
g_object_new
(
FB_TYPE_MQTT_MESSAGE
,
NULL
);
priv
=
msg
->
priv
;
priv
->
bytes
=
bytes
;
priv
->
local
=
FALSE
;
priv
->
type
=
(
*
bytes
->
data
&
0xF0
)
>>
4
;
priv
->
flags
=
*
bytes
->
data
&
0x0F
;
/* Skip the fixed header */
for
(
byte
=
priv
->
bytes
->
data
+
1
;
(
*
(
byte
++
)
&
128
)
!=
0
;
);
priv
->
offset
=
byte
-
bytes
->
data
;
priv
->
pos
=
priv
->
offset
;
return
msg
;
}
void
fb_mqtt_message_reset
(
FbMqttMessage
*
msg
)
{
FbMqttMessagePrivate
*
priv
;
g_return_if_fail
(
FB_IS_MQTT_MESSAGE
(
msg
));
priv
=
msg
->
priv
;
if
(
priv
->
offset
>
0
)
{
g_byte_array_remove_range
(
priv
->
bytes
,
0
,
priv
->
offset
);
priv
->
offset
=
0
;
priv
->
pos
=
0
;
}
}
const
GByteArray
*
fb_mqtt_message_bytes
(
FbMqttMessage
*
msg
)
{
FbMqttMessagePrivate
*
priv
;
guint
i
;
guint8
byte
;
guint8
sbuf
[
4
];
guint32
size
;
g_return_val_if_fail
(
FB_IS_MQTT_MESSAGE
(
msg
),
NULL
);
priv
=
msg
->
priv
;
i
=
0
;
size
=
priv
->
bytes
->
len
-
priv
->
offset
;
do
{
if
(
G_UNLIKELY
(
i
>=
G_N_ELEMENTS
(
sbuf
)))
{
return
NULL
;
}
byte
=
size
%
128
;
size
/=
128
;
if
(
size
>
0
)
{
byte
|=
128
;
}
sbuf
[
i
++
]
=
byte
;
}
while
(
size
>
0
);
fb_mqtt_message_reset
(
msg
);
g_byte_array_prepend
(
priv
->
bytes
,
sbuf
,
i
);
byte
=
((
priv
->
type
&
0x0F
)
<<
4
)
|
(
priv
->
flags
&
0x0F
);
g_byte_array_prepend
(
priv
->
bytes
,
&
byte
,
sizeof
byte
);
priv
->
pos
=
(
i
+
1
)
*
(
sizeof
byte
);
return
priv
->
bytes
;
}
gboolean
fb_mqtt_message_read
(
FbMqttMessage
*
msg
,
gpointer
data
,
guint
size
)
{
FbMqttMessagePrivate
*
priv
;
g_return_val_if_fail
(
FB_IS_MQTT_MESSAGE
(
msg
),
FALSE
);
priv
=
msg
->
priv
;
if
((
priv
->
pos
+
size
)
>
priv
->
bytes
->
len
)
{
return
FALSE
;
}
if
((
data
!=
NULL
)
&&
(
size
>
0
))
{
memcpy
(
data
,
priv
->
bytes
->
data
+
priv
->
pos
,
size
);
}
priv
->
pos
+=
size
;
return
TRUE
;
}
gboolean
fb_mqtt_message_read_r
(
FbMqttMessage
*
msg
,
GByteArray
*
bytes
)
{
FbMqttMessagePrivate
*
priv
;
guint
size
;
g_return_val_if_fail
(
FB_IS_MQTT_MESSAGE
(
msg
),
FALSE
);
priv
=
msg
->
priv
;
size
=
priv
->
bytes
->
len
-
priv
->
pos
;
if
(
G_LIKELY
(
size
>
0
))
{
g_byte_array_append
(
bytes
,
priv
->
bytes
->
data
+
priv
->
pos
,
size
);
}
return
TRUE
;
}
gboolean
fb_mqtt_message_read_byte
(
FbMqttMessage
*
msg
,
guint8
*
value
)
{
return
fb_mqtt_message_read
(
msg
,
value
,
sizeof
*
value
);
}
gboolean
fb_mqtt_message_read_mid
(
FbMqttMessage
*
msg
,
guint16
*
value
)
{
return
fb_mqtt_message_read_u16
(
msg
,
value
);
}
gboolean
fb_mqtt_message_read_u16
(
FbMqttMessage
*
msg
,
guint16
*
value
)
{
if
(
!
fb_mqtt_message_read
(
msg
,
value
,
sizeof
*
value
))
{
return
FALSE
;
}
if
(
value
!=
NULL
)
{
*
value
=
g_ntohs
(
*
value
);
}
return
TRUE
;
}
gboolean
fb_mqtt_message_read_str
(
FbMqttMessage
*
msg
,
gchar
**
value
)
{
guint8
*
data
;
guint16
size
;
if
(
!
fb_mqtt_message_read_u16
(
msg
,
&
size
))
{
return
FALSE
;
}
if
(
value
!=
NULL
)
{
data
=
g_new
(
guint8
,
size
+
1
);
data
[
size
]
=
0
;
}
else
{
data
=
NULL
;
}
if
(
!
fb_mqtt_message_read
(
msg
,
data
,
size
))
{
g_free
(
data
);
return
FALSE
;
}
if
(
value
!=
NULL
)
{
*
value
=
(
gchar
*
)
data
;
}
return
TRUE
;
}
void
fb_mqtt_message_write
(
FbMqttMessage
*
msg
,
gconstpointer
data
,
guint
size
)
{
FbMqttMessagePrivate
*
priv
;
g_return_if_fail
(
FB_IS_MQTT_MESSAGE
(
msg
));
priv
=
msg
->
priv
;
g_byte_array_append
(
priv
->
bytes
,
data
,
size
);
priv
->
pos
+=
size
;
}
void
fb_mqtt_message_write_byte
(
FbMqttMessage
*
msg
,
guint8
value
)
{
fb_mqtt_message_write
(
msg
,
&
value
,
sizeof
value
);
}
void
fb_mqtt_message_write_mid
(
FbMqttMessage
*
msg
,
guint16
*
value
)
{
g_return_if_fail
(
value
!=
NULL
);
fb_mqtt_message_write_u16
(
msg
,
++
(
*
value
));
}
void
fb_mqtt_message_write_u16
(
FbMqttMessage
*
msg
,
guint16
value
)
{
value
=
g_htons
(
value
);
fb_mqtt_message_write
(
msg
,
&
value
,
sizeof
value
);
}
void
fb_mqtt_message_write_str
(
FbMqttMessage
*
msg
,
const
gchar
*
value
)
{
gint16
size
;
g_return_if_fail
(
value
!=
NULL
);
size
=
strlen
(
value
);
fb_mqtt_message_write_u16
(
msg
,
size
);
fb_mqtt_message_write
(
msg
,
value
,
size
);
}