From af64f799aa7764680df0c81b91d37142b9e71514 Mon Sep 17 00:00:00 2001 From: jilei Date: Mon, 29 Dec 2025 12:07:03 +0800 Subject: [PATCH] umq support modify queue state api --- .../include/umq/transport_layer/umq_tp_api.h | 9 +++++++++ src/urpc/include/umq/umq_api.h | 9 +++++++++ src/urpc/umq/umq_api.c | 11 +++++++++++ src/urpc/umq/umq_ub/umq_ub.c | 6 ++++++ src/urpc/umq/umq_ub/umq_ub_impl.c | 17 +++++++++++++++++ src/urpc/umq/umq_ub/umq_ub_impl.h | 1 + src/urpc/umq/umq_ub/umq_ub_plus.c | 6 ++++++ 7 files changed, 59 insertions(+) diff --git a/src/urpc/include/umq/transport_layer/umq_tp_api.h b/src/urpc/include/umq/transport_layer/umq_tp_api.h index 55ad817..b2a5b43 100644 --- a/src/urpc/include/umq/transport_layer/umq_tp_api.h +++ b/src/urpc/include/umq/transport_layer/umq_tp_api.h @@ -79,6 +79,15 @@ typedef struct umq_ops { */ int (*umq_tp_unbind)(uint64_t umqh_tp); + /** + * User should ensure thread safety if io_lock_free is true + * Set umq state + * @param[in] umqh_tp: umq handle + * @param[in] state: umq state want to set(Only Support Set ERR STATE) + * Return 0 on success, error code on failure + */ + int (*umq_tp_state_set)(uint64_t umqh_tp, umq_state_t state); + /** * User should ensure thread safety if io_lock_free is true * Query umq state diff --git a/src/urpc/include/umq/umq_api.h b/src/urpc/include/umq/umq_api.h index 44f4d7b..992bab2 100644 --- a/src/urpc/include/umq/umq_api.h +++ b/src/urpc/include/umq/umq_api.h @@ -79,6 +79,15 @@ int umq_bind(uint64_t umqh, uint8_t *bind_info, uint32_t bind_info_size); */ int umq_unbind(uint64_t umqh); +/** + * User should ensure thread safety if io_lock_free is true + * Set umq state + * @param[in] umqh: umq handle + * @param[in] state: umq state want to set (Only Support Set ERR STATE) + * Return 0 on success, error code on failure + */ +int umq_state_set(uint64_t umqh, umq_state_t state); + /** * User should ensure thread safety if io_lock_free is true * Query umq state diff --git a/src/urpc/umq/umq_api.c b/src/urpc/umq/umq_api.c index 1642657..433c4a8 100644 --- a/src/urpc/umq/umq_api.c +++ b/src/urpc/umq/umq_api.c @@ -889,6 +889,17 @@ int umq_buf_split(umq_buf_t *head, umq_buf_t *node) return UMQ_SUCCESS; } +int umq_state_set(uint64_t umqh, umq_state_t state) +{ + umq_t *umq = (umq_t *)(uintptr_t)umqh; + if (umq == NULL || umq->umqh_tp == UMQ_INVALID_HANDLE || umq->tp_ops->umq_tp_state_set == NULL) { + UMQ_VLOG_ERR("parameter invalid\n"); + return -UMQ_ERR_EINVAL; + } + + return umq->tp_ops->umq_tp_state_set(umq->umqh_tp, state); +} + umq_state_t umq_state_get(uint64_t umqh) { umq_t *umq = (umq_t *)(uintptr_t)umqh; diff --git a/src/urpc/umq/umq_ub/umq_ub.c b/src/urpc/umq/umq_ub/umq_ub.c index b67a712..9aed7a7 100644 --- a/src/urpc/umq/umq_ub/umq_ub.c +++ b/src/urpc/umq/umq_ub/umq_ub.c @@ -69,6 +69,11 @@ static int umq_tp_ub_unbind(uint64_t umqh_tp) return umq_ub_unbind_impl(umqh_tp); } +static int umq_tp_ub_state_set(uint64_t umqh_tp, umq_state_t state) +{ + return umq_ub_state_set_impl(umqh_tp, state); +} + static umq_state_t umq_tp_ub_state_get(uint64_t umqh_tp) { return umq_ub_state_get_impl(umqh_tp); @@ -179,6 +184,7 @@ static umq_ops_t g_umq_ub_ops = { .umq_tp_bind_info_get = umq_tp_ub_bind_info_get, .umq_tp_bind = umq_tp_ub_bind, .umq_tp_unbind = umq_tp_ub_unbind, + .umq_tp_state_set = umq_tp_ub_state_set, .umq_tp_state_get = umq_tp_ub_state_get, .umq_tp_log_config_set = umq_tp_ub_log_config_set, .umq_tp_log_config_reset = umq_tp_ub_log_config_reset, diff --git a/src/urpc/umq/umq_ub/umq_ub_impl.c b/src/urpc/umq/umq_ub/umq_ub_impl.c index 5301dff..893c16e 100644 --- a/src/urpc/umq/umq_ub/umq_ub_impl.c +++ b/src/urpc/umq/umq_ub/umq_ub_impl.c @@ -4364,6 +4364,23 @@ util_id_allocator_t *umq_ub_get_msg_id_generator(uint64_t umqh_tp) return &g_umq_ub_id_allocator; } +int umq_ub_state_set_impl(uint64_t umqh_tp, umq_state_t state) +{ + // todo add only support main umq judgement + if (state != QUEUE_STATE_ERR) { + UMQ_VLOG_ERR("set state only support error\n"); + return -UMQ_ERR_EINVAL; + } + + ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh_tp; + if (queue->state == QUEUE_STATE_ERR) { + UMQ_VLOG_WARN("queue state already in error state\n"); + return UMQ_SUCCESS; + } + + return umq_modify_ubq_to_err(queue); +} + umq_state_t umq_ub_state_get_impl(uint64_t umqh_tp) { ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh_tp; diff --git a/src/urpc/umq/umq_ub/umq_ub_impl.h b/src/urpc/umq/umq_ub/umq_ub_impl.h index 39b25c1..e2ab460 100644 --- a/src/urpc/umq/umq_ub/umq_ub_impl.h +++ b/src/urpc/umq/umq_ub/umq_ub_impl.h @@ -73,6 +73,7 @@ int32_t umq_ub_destroy_impl(uint64_t umqh); int umq_ub_bind_info_get_impl(uint64_t umqh, uint8_t *bind_info, uint32_t bind_info_size); int umq_ub_bind_impl(uint64_t umqh, uint8_t *bind_info, uint32_t bind_info_size); int umq_ub_unbind_impl(uint64_t umqh); +int umq_ub_state_set_impl(uint64_t umqh_tp, umq_state_t state); umq_state_t umq_ub_state_get_impl(uint64_t umqh_tp); int32_t umq_ub_register_memory_impl(void *buf, uint64_t size); diff --git a/src/urpc/umq/umq_ub/umq_ub_plus.c b/src/urpc/umq/umq_ub/umq_ub_plus.c index 5b74592..63bd98e 100644 --- a/src/urpc/umq/umq_ub/umq_ub_plus.c +++ b/src/urpc/umq/umq_ub/umq_ub_plus.c @@ -79,6 +79,11 @@ static int umq_tp_ub_plus_unbind(uint64_t umqh_tp) return umq_ub_unbind_impl(umqh_tp); } +static int umq_tp_ub_plus_state_set(uint64_t umqh_tp, umq_state_t state) +{ + return umq_ub_state_set_impl(umqh_tp, state); +} + static umq_state_t umq_tp_ub_plus_state_get(uint64_t umqh_tp) { return umq_ub_state_get_impl(umqh_tp); @@ -193,6 +198,7 @@ static umq_ops_t g_umq_ub_plus_ops = { .umq_tp_bind_info_get = umq_tp_ub_plus_bind_info_get, .umq_tp_bind = umq_tp_ub_plus_bind, .umq_tp_unbind = umq_tp_ub_plus_unbind, + .umq_tp_state_set = umq_tp_ub_plus_state_set, .umq_tp_state_get = umq_tp_ub_plus_state_get, .umq_tp_log_config_set = umq_tp_ub_plus_log_config_set, .umq_tp_log_config_reset = umq_tp_ub_plus_log_config_reset, -- Gitee