Thu, 13 Mar 2025 16:04:57 -0500
Update gi-docgen to 2025.3
Testing Done:
Called in the turtles.
Reviewed at https://reviews.imfreedom.org/r/3886/
/* * Copyright (C) 2023-2025 Xeme Developers * * Xeme is the legal property of its developers, whose names are too * numerous to list here. Please refer to the AUTHORS file distributed * with this source distribution * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, see * <https://www.gnu.org/licenses/>. */ #include "xemeinputstream.h" #include "xemecore.h" #include "xemeextensionmap.h" #include "xemestring.h" const gsize XEME_INPUT_STREAM_BUFFER_LEN = 4096; enum { PROP_0, PROP_OUTPUT_STREAM, N_PROPERTIES, }; static GParamSpec *properties[N_PROPERTIES] = {NULL, }; enum { SIG_CLOSED, SIG_RESTART_REQUESTED, N_SIGNALS, }; static guint signals[N_SIGNALS] = {0, }; typedef struct { gboolean running; GInputStream *input; XemeOutputStream *output; GMarkupParseContext *context; XemeExtensionMap *features; } XemeInputStreamPrivate; G_DEFINE_TYPE_WITH_PRIVATE(XemeInputStream, xeme_input_stream, XEME_TYPE_STREAM) /****************************************************************************** * Helpers *****************************************************************************/ static void xeme_input_stream_read_bytes_async_cb(G_GNUC_UNUSED GObject *source, GAsyncResult *result, gpointer data) { XemeInputStream *stream = data; XemeInputStreamPrivate *priv = NULL; GBytes *bytes = NULL; GError *error = NULL; const char *raw = NULL; gsize length = 0; priv = xeme_input_stream_get_instance_private(stream); bytes = g_input_stream_read_bytes_finish(priv->input, result, &error); if(bytes == NULL) { g_message("failed to read: %s", error != NULL ? error->message : "unknown error"); g_clear_error(&error); xeme_input_stream_stop(stream, NULL); return; } raw = g_bytes_get_data(bytes, &length); if(!g_markup_parse_context_parse(priv->context, raw, length, &error)) { g_warning("we hit an error: %s", error->message); length = 0; xeme_input_stream_stop(stream, NULL); return; } /* Unref the data we read. */ g_bytes_unref(bytes); /* Make another asynchronous read call. */ if(length > 0 && priv->running == TRUE) { GCancellable *cancellable = NULL; cancellable = xeme_stream_get_cancellable(XEME_STREAM(stream)); g_input_stream_read_bytes_async(priv->input, XEME_INPUT_STREAM_BUFFER_LEN, G_PRIORITY_DEFAULT, cancellable, xeme_input_stream_read_bytes_async_cb, stream); } } /****************************************************************************** * Parser *****************************************************************************/ static void xeme_input_stream_parse_stream(XemeInputStream *input_stream, G_GNUC_UNUSED const char *element_name, const char **attribute_names, const char **attribute_values, GError **error) { XemeStream *stream = XEME_STREAM(input_stream); gboolean ret = FALSE; const char *id = NULL; const char *to = NULL; const char *from = NULL; const char *version = NULL; const char *xml_lang = NULL; const char *xmlns = NULL; const char *xmlns_stream = NULL; ret = g_markup_collect_attributes( element_name, attribute_names, attribute_values, error, G_MARKUP_COLLECT_STRING | G_MARKUP_COLLECT_OPTIONAL, "id", &id, G_MARKUP_COLLECT_STRING, "to", &to, G_MARKUP_COLLECT_STRING, "from", &from, G_MARKUP_COLLECT_STRING, "version", &version, G_MARKUP_COLLECT_STRING | G_MARKUP_COLLECT_OPTIONAL, "xml:lang", &xml_lang, G_MARKUP_COLLECT_STRING | G_MARKUP_COLLECT_OPTIONAL, "xmlns", &xmlns, G_MARKUP_COLLECT_STRING | G_MARKUP_COLLECT_OPTIONAL, "xmlns:stream", &xmlns_stream, G_MARKUP_COLLECT_INVALID); if(!ret) { return; } if(!xeme_str_is_empty(id)) { xeme_stream_set_id(stream, id); } xeme_stream_set_to(stream, to); xeme_stream_set_from(stream, from); if(!xeme_str_equal(version, XEME_STREAM_VERSION)) { g_set_error(error, XEME_DOMAIN, 0, "got version='%s' but we were expecting '%s'", version, XEME_STREAM_VERSION); return; } if(!xeme_str_is_empty(xml_lang)) { xeme_stream_set_language(stream, xml_lang); } if(!xeme_str_equal(xmlns, XEME_STREAM_XMLNS)) { g_set_error(error, XEME_DOMAIN, 0, "got xmlns='%s' but was expecting '%s'", xmlns, XEME_STREAM_XMLNS); return; } if(!xeme_str_equal(xmlns_stream, XEME_STREAM_XMLNS_STREAM)) { g_set_error(error, XEME_DOMAIN, 0, "got xmlns:stream='%s' but was expecting '%s'", xmlns_stream, XEME_STREAM_XMLNS_STREAM); return; } } static void xeme_connection_parser_element_start(G_GNUC_UNUSED GMarkupParseContext *context, const char *name, const char **attribute_names, const char **attribute_values, gpointer data, GError **error) { XemeInputStream *stream = data; XemeInputStreamPrivate *priv = NULL; priv = xeme_input_stream_get_instance_private(stream); if(xeme_str_equal(name, "stream:stream")) { xeme_input_stream_parse_stream(stream, name, attribute_names, attribute_values, error); } else if(xeme_str_equal(name, "stream:features")) { xeme_extension_map_parse_start(priv->features, stream, context); } else { g_warning("unexpected element start '%s'", name); } } static void xeme_connection_parser_element_end(G_GNUC_UNUSED GMarkupParseContext *context, const char *name, gpointer data, GError **error) { XemeInputStream *stream = data; if(xeme_str_equal(name, "stream:stream")) { /* The connection is being terminated. */ xeme_input_stream_stop(stream, error); return; } else { g_warning("unexpected element end '%s'", name); } } static GMarkupParser parser = { .start_element = xeme_connection_parser_element_start, .end_element = xeme_connection_parser_element_end, }; /****************************************************************************** * GObject Implementation *****************************************************************************/ static void xeme_input_stream_finalize(GObject *obj) { XemeInputStream *stream = XEME_INPUT_STREAM(obj); XemeInputStreamPrivate *priv = NULL; priv = xeme_input_stream_get_instance_private(stream); g_clear_pointer(&priv->context, g_markup_parse_context_free); g_clear_object(&priv->features); g_clear_object(&priv->input); g_clear_object(&priv->output); G_OBJECT_CLASS(xeme_input_stream_parent_class)->finalize(obj); } static void xeme_input_stream_get_property(GObject *obj, guint param_id, GValue *value, GParamSpec *pspec) { XemeInputStream *stream = XEME_INPUT_STREAM(obj); switch(param_id) { case PROP_OUTPUT_STREAM: g_value_set_object(value, xeme_input_stream_get_output_stream(stream)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(obj, param_id, pspec); break; } } static void xeme_input_stream_init(XemeInputStream *stream) { XemeInputStreamPrivate *priv = NULL; priv = xeme_input_stream_get_instance_private(stream); priv->running = FALSE; priv->context = g_markup_parse_context_new(&parser, 0, stream, NULL); priv->features = xeme_extension_map_new(); } static void xeme_input_stream_class_init(XemeInputStreamClass *klass) { GObjectClass *obj_class = G_OBJECT_CLASS(klass); obj_class->finalize = xeme_input_stream_finalize; obj_class->get_property = xeme_input_stream_get_property; /** * XemeInputStream:output-stream: * * The output stream that should be used for requests and responses. * * This is only set after [method@InputStream.start] is called. * * Since: 0.1 */ properties[PROP_OUTPUT_STREAM] = g_param_spec_object( "output-stream", NULL, NULL, XEME_TYPE_OUTPUT_STREAM, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS); g_object_class_install_properties(obj_class, N_PROPERTIES, properties); /** * XemeInputStream::closed: * * Emitted when the input stream is closed. * * Since: 0.1 */ signals[SIG_CLOSED] = g_signal_new_class_handler( "closed", G_OBJECT_CLASS_TYPE(klass), G_SIGNAL_RUN_LAST, NULL, NULL, NULL, NULL, G_TYPE_NONE, 0); /** * XemeInputStream::restart-requested: * * Emitted when the remote side has requested that the output stream be * restarted. * * This is typically requested during feature negotiation which happens * during the initial connection. * * Since: 0.1 */ signals[SIG_RESTART_REQUESTED] = g_signal_new_class_handler( "restart-requested", G_OBJECT_CLASS_TYPE(klass), G_SIGNAL_RUN_LAST, NULL, NULL, NULL, NULL, G_TYPE_NONE, 0); } /****************************************************************************** * Public API *****************************************************************************/ XemeInputStream * xeme_input_stream_new(void) { return g_object_new(XEME_TYPE_INPUT_STREAM, NULL); } gboolean xeme_input_stream_start(XemeInputStream *stream, GInputStream *input, XemeOutputStream *output, GError **error) { XemeInputStreamPrivate *priv = NULL; GCancellable *cancellable = NULL; g_return_val_if_fail(XEME_IS_INPUT_STREAM(stream), FALSE); g_return_val_if_fail(G_IS_INPUT_STREAM(input), FALSE); g_return_val_if_fail(XEME_IS_OUTPUT_STREAM(output), FALSE); g_return_val_if_fail(error == NULL || *error == NULL, FALSE); priv = xeme_input_stream_get_instance_private(stream); if(priv->running) { g_set_error_literal(error, XEME_DOMAIN, 0, "input stream has already been started"); return FALSE; } cancellable = xeme_stream_get_cancellable(XEME_STREAM(stream)); if(G_IS_CANCELLABLE(cancellable)) { g_cancellable_reset(cancellable); } /* Store the output stream. */ if(g_set_object(&priv->output, output)) { g_object_notify_by_pspec(G_OBJECT(stream), properties[PROP_OUTPUT_STREAM]); } /* Setup the read handler. */ priv->input = g_object_ref(input); g_input_stream_read_bytes_async(priv->input, XEME_INPUT_STREAM_BUFFER_LEN, G_PRIORITY_DEFAULT, cancellable, xeme_input_stream_read_bytes_async_cb, stream); priv->running = TRUE; return TRUE; } gboolean xeme_input_stream_stop(XemeInputStream *stream, GError **error) { XemeInputStreamPrivate *priv = NULL; GCancellable *cancellable = NULL; gboolean ret = FALSE; g_return_val_if_fail(XEME_IS_INPUT_STREAM(stream), FALSE); priv = xeme_input_stream_get_instance_private(stream); if(priv->running == FALSE) { return TRUE; } cancellable = xeme_stream_get_cancellable(XEME_STREAM(stream)); if(G_IS_CANCELLABLE(cancellable) && !g_cancellable_is_cancelled(cancellable)) { g_cancellable_cancel(cancellable); } priv->running = FALSE; ret = g_input_stream_close(priv->input, NULL, error); if(ret) { g_signal_emit(stream, signals[SIG_CLOSED], 0); g_clear_object(&priv->input); } return ret; } void xeme_input_stream_restart_requested(XemeInputStream *stream) { g_return_if_fail(XEME_IS_INPUT_STREAM(stream)); g_signal_emit(stream, signals[SIG_RESTART_REQUESTED], 0); } XemeOutputStream * xeme_input_stream_get_output_stream(XemeInputStream *stream) { XemeInputStreamPrivate *priv = NULL; g_return_val_if_fail(XEME_IS_INPUT_STREAM(stream), NULL); priv = xeme_input_stream_get_instance_private(stream); return priv->output; }