--- a/libpurple/queuedoutputstream.c Sat Oct 08 04:03:41 2022 -0500
+++ b/libpurple/queuedoutputstream.c Sat Oct 08 04:54:55 2022 -0500
@@ -1,10 +1,6 @@
- * Purple is the legal property of its developers, whose names are too numerous
- * to list here. Please refer to the COPYRIGHT file distributed with this
+ * Purple - Internet Messaging Library + * Copyright (C) Pidgin Developers <devel@pidgin.im> * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -17,8 +13,7 @@
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
+ * along with this program; if not, see <https://www.gnu.org/licenses/>. #include "queuedoutputstream.h"
@@ -42,7 +37,8 @@
purple_queued_output_stream_push_bytes_async_cb(GObject *source,
- GAsyncResult *res, gpointer user_data)
GTask *task = G_TASK(user_data);
PurpleQueuedOutputStream *stream = g_task_get_source_object(task);
@@ -52,7 +48,7 @@
written = g_output_stream_write_bytes_finish(G_OUTPUT_STREAM(source),
bytes = g_task_get_task_data(task);
size = g_bytes_get_size(bytes);
@@ -64,8 +60,7 @@
} else if (size > written) {
/* Partial write, prepare to send remaining data */
bytes = g_bytes_new_from_bytes(bytes, written, size - written);
- g_task_set_task_data(task, bytes,
- (GDestroyNotify)g_bytes_unref);
+ g_task_set_task_data(task, bytes, (GDestroyNotify)g_bytes_unref); /* Full write, this task is finished */
g_task_return_boolean(task, TRUE);
@@ -93,29 +88,26 @@
-purple_queued_output_stream_start_push_bytes_async(GTask *task)
+purple_queued_output_stream_start_push_bytes_async(GTask *task) { PurpleQueuedOutputStream *stream = g_task_get_source_object(task);
- GOutputStream *base_stream;
+ GOutputStream *base_stream = NULL; + GFilterOutputStream *filtered = NULL; - base_stream = g_filter_output_stream_get_base_stream(
- G_FILTER_OUTPUT_STREAM(stream));
+ filtered = G_FILTER_OUTPUT_STREAM(stream); + base_stream = g_filter_output_stream_get_base_stream(filtered); - g_output_stream_write_bytes_async(base_stream,
- g_task_get_task_data(task),
- g_task_get_priority(task),
- g_task_get_cancellable(task),
- purple_queued_output_stream_push_bytes_async_cb,
+ g_output_stream_write_bytes_async(base_stream, g_task_get_task_data(task), + g_task_get_priority(task), + g_task_get_cancellable(task), + purple_queued_output_stream_push_bytes_async_cb, /******************************************************************************
*****************************************************************************/
-purple_queued_output_stream_dispose(GObject *object)
+purple_queued_output_stream_dispose(GObject *object) { PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object);
g_clear_pointer(&stream->queue, g_async_queue_unref);
@@ -124,16 +116,14 @@
-purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
+purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass) { GObjectClass *obj_class = G_OBJECT_CLASS(klass);
obj_class->dispose = purple_queued_output_stream_dispose;
-purple_queued_output_stream_init(PurpleQueuedOutputStream *stream)
+purple_queued_output_stream_init(PurpleQueuedOutputStream *stream) { stream->queue = g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
stream->pending_queued = FALSE;
@@ -141,25 +131,22 @@
/******************************************************************************
*****************************************************************************/
PurpleQueuedOutputStream *
-purple_queued_output_stream_new(GOutputStream *base_stream)
- PurpleQueuedOutputStream *stream;
+purple_queued_output_stream_new(GOutputStream *base_stream) { g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL);
- stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM,
- "base-stream", base_stream,
+ PURPLE_TYPE_QUEUED_OUTPUT_STREAM, + "base-stream", base_stream, purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream,
- GBytes *bytes, int io_priority, GCancellable *cancellable,
- GAsyncReadyCallback callback, gpointer user_data)
+ GBytes *bytes, int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, @@ -170,20 +157,19 @@
task = g_task_new(stream, cancellable, callback, user_data);
g_task_set_task_data(task, g_bytes_ref(bytes),
- (GDestroyNotify)g_bytes_unref);
- g_task_set_source_tag(task,
- purple_queued_output_stream_push_bytes_async);
+ (GDestroyNotify)g_bytes_unref); + g_task_set_source_tag(task, purple_queued_output_stream_push_bytes_async); g_task_set_priority(task, io_priority);
- set_pending = g_output_stream_set_pending(
- G_OUTPUT_STREAM(stream), &error);
+ set_pending = g_output_stream_set_pending(G_OUTPUT_STREAM(stream), &error); /* Since we're allowing queuing requests without blocking,
* it's not an error to be pending while processing queued operations.
- if (!set_pending && (!g_error_matches(error,
- G_IO_ERROR, G_IO_ERROR_PENDING) ||
- !stream->pending_queued)) {
+ (!g_error_matches(error, G_IO_ERROR, G_IO_ERROR_PENDING) || + !stream->pending_queued)) g_task_return_error(task, error);
@@ -192,7 +178,7 @@
stream->pending_queued = TRUE;
/* Start processing if there were no pending operations */
purple_queued_output_stream_start_push_bytes_async(task);
@@ -203,26 +189,27 @@
purple_queued_output_stream_push_bytes_finish(PurpleQueuedOutputStream *stream,
- GAsyncResult *result, GError **error)
g_return_val_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream), FALSE);
g_return_val_if_fail(g_task_is_valid(result, stream), FALSE);
g_return_val_if_fail(g_async_result_is_tagged(result,
- purple_queued_output_stream_push_bytes_async), FALSE);
+ purple_queued_output_stream_push_bytes_async), return g_task_propagate_boolean(G_TASK(result), error);
-purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream)
+purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream) { g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream));
- while ((task = g_async_queue_try_pop(stream->queue)) != NULL) {
+ while((task = g_async_queue_try_pop(stream->queue)) != NULL) { g_task_return_new_error(task, G_IO_ERROR, G_IO_ERROR_CANCELLED,
- "PurpleQueuedOutputStream queue cleared");
+ "PurpleQueuedOutputStream queue cleared"); --- a/libpurple/queuedoutputstream.h Sat Oct 08 04:03:41 2022 -0500
+++ b/libpurple/queuedoutputstream.h Sat Oct 08 04:54:55 2022 -0500
@@ -1,10 +1,6 @@
- * Purple is the legal property of its developers, whose names are too numerous
- * to list here. Please refer to the COPYRIGHT file distributed with this
+ * Purple - Internet Messaging Library + * Copyright (C) Pidgin Developers <devel@pidgin.im> * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -17,8 +13,7 @@
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
+ * along with this program; if not, see <https://www.gnu.org/licenses/>. #if !defined(PURPLE_GLOBAL_HEADER_INSIDE) && !defined(PURPLE_COMPILATION)
@@ -50,68 +45,73 @@
* queued with [method@QueuedOutputStream.clear_queue] to avoid excessive
* errors returned in [method@QueuedOutputStream.push_bytes_async]'s async
-G_DECLARE_FINAL_TYPE(PurpleQueuedOutputStream,
- purple_queued_output_stream, PURPLE,
- QUEUED_OUTPUT_STREAM, GFilterOutputStream)
+G_DECLARE_FINAL_TYPE(PurpleQueuedOutputStream, purple_queued_output_stream, + PURPLE, QUEUED_OUTPUT_STREAM, GFilterOutputStream)
- * purple_queued_output_stream_new
+ * purple_queued_output_stream_new: * @base_stream: Base output stream to wrap with the queued stream
* Creates a new queued output stream for a base stream.
+ * Returns: (transfer full): The new stream. -PurpleQueuedOutputStream *purple_queued_output_stream_new(
- GOutputStream *base_stream);
+PurpleQueuedOutputStream *purple_queued_output_stream_new(GOutputStream *base_stream);
- * purple_queued_output_stream_push_bytes_async
- * @stream: #PurpleQueuedOutputStream to push bytes to
- * @bytes: Bytes to queue
- * @priority: IO priority of the request
- * @cancellable: (allow-none): Optional #GCancellable object, NULL to ignore
- * @callback: (scope async): Callback to call when the request is finished
- * @user_data: (closure): Data to pass to the callback function
+ * purple_queued_output_stream_push_bytes_async: + * @stream: The instance. + * @bytes: The bytes to queue. + * @priority: IO priority of the request. + * @cancellable: (nullable): A [class@Gio.Cancellable] or %NULL. + * @callback: (scope async): Callback to call when the request is finished. + * @data: (closure): Data to pass to @callback. - * Asynchronously queues and then writes data to the output stream.
- * Once the data has been written, or an error occurs, the callback
+ * Asynchronously queues and then writes data to @stream. Once the data has + * been written, or an error occurs, @callback will be called. * Be careful such that if there's a fatal stream error, all remaining queued
* operations will likely return this error. Use
- * #purple_queued_output_stream_clear_queue() to clear the queue on such
+ * [method@Purple.QueuedOutputStream.clear_queue] to clear the queue on such * an error to only report it a single time.
-void purple_queued_output_stream_push_bytes_async(
- PurpleQueuedOutputStream *stream, GBytes *bytes,
- int io_priority, GCancellable *cancellable,
- GAsyncReadyCallback callback, gpointer user_data);
+void purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream, GBytes *bytes, int priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer data);
- * purple_queued_output_stream_push_bytes_finish
- * @stream: #PurpleQueuedOutputStream bytes were pushed to
- * @result: The #GAsyncResult of this operation
- * @error: A GError location to store the error, or NULL to ignore
+ * purple_queued_output_stream_push_bytes_finish: + * @stream: The instance. + * @result: The [iface@Gio.AsyncResult] of this operation. + * @error: (nullable) (optional): A [type@Glib.Error] location to store the + * error, or NULL to ignore * Finishes pushing bytes asynchronously.
* Returns: %TRUE on success, %FALSE if there was an error
-gboolean purple_queued_output_stream_push_bytes_finish(
- PurpleQueuedOutputStream *stream,
- GAsyncResult *result, GError **error);
+gboolean purple_queued_output_stream_push_bytes_finish(PurpleQueuedOutputStream *stream, GAsyncResult *result, GError **error);
- * purple_queued_output_stream_clear_queue
- * @stream: #PurpleQueuedOutputStream to clear
+ * purple_queued_output_stream_clear_queue: + * @stream: The instance. + * Clears the queue of any pending bytes. However, any bytes that are in the + * process of being sent will finish their operation. - * Clears the queue of any pending bytes. However, any bytes that are
- * in the process of being sent will finish their operation.
+ * This function is useful for clearing the queue in case of an IO error. Call + * this in the async callback in order to clear the queue and avoid having all + * [method@Purple.QueuedOutputStream.push_bytes_async] calls on @stream return + * errors if there's a fatal stream error. - * This function is useful for clearing the queue in case of an IO error.
- * Call this in the async callback in order to clear the queue and avoid
- * having all #purple_queue_output_stream_push_bytes_async() calls on
- * this queue return errors if there's a fatal stream error.
void purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream);