diff --git a/src/urpc/include/umq/transport_layer/umq_tp_api.h b/src/urpc/include/umq/transport_layer/umq_tp_api.h index 55ad817b0da3c2037157ae9584a1d6d1bda6ce6d..b2a5b435abcdd45ff846aebcca9cb77303e8dce1 100644 --- a/src/urpc/include/umq/transport_layer/umq_tp_api.h +++ b/src/urpc/include/umq/transport_layer/umq_tp_api.h @@ -79,6 +79,15 @@ typedef struct umq_ops { */ int (*umq_tp_unbind)(uint64_t umqh_tp); + /** + * User should ensure thread safety if io_lock_free is true + * Set umq state + * @param[in] umqh_tp: umq handle + * @param[in] state: umq state want to set(Only Support Set ERR STATE) + * Return 0 on success, error code on failure + */ + int (*umq_tp_state_set)(uint64_t umqh_tp, umq_state_t state); + /** * User should ensure thread safety if io_lock_free is true * Query umq state diff --git a/src/urpc/include/umq/umq_api.h b/src/urpc/include/umq/umq_api.h index 44f4d7b582db48ada89df408b338d6b68b5894c9..992bab2872c376067cf830d2a2b7baa30ea7f6cf 100644 --- a/src/urpc/include/umq/umq_api.h +++ b/src/urpc/include/umq/umq_api.h @@ -79,6 +79,15 @@ int umq_bind(uint64_t umqh, uint8_t *bind_info, uint32_t bind_info_size); */ int umq_unbind(uint64_t umqh); +/** + * User should ensure thread safety if io_lock_free is true + * Set umq state + * @param[in] umqh: umq handle + * @param[in] state: umq state want to set (Only Support Set ERR STATE) + * Return 0 on success, error code on failure + */ +int umq_state_set(uint64_t umqh, umq_state_t state); + /** * User should ensure thread safety if io_lock_free is true * Query umq state diff --git a/src/urpc/umq/umq_api.c b/src/urpc/umq/umq_api.c index 1642657b055ba8bca85431c31f44e545946573b6..433c4a804df97903ed7d38fb9666dc70aa866553 100644 --- a/src/urpc/umq/umq_api.c +++ b/src/urpc/umq/umq_api.c @@ -889,6 +889,17 @@ int umq_buf_split(umq_buf_t *head, umq_buf_t *node) return UMQ_SUCCESS; } +int umq_state_set(uint64_t umqh, umq_state_t state) +{ + umq_t *umq = (umq_t *)(uintptr_t)umqh; + if (umq == NULL || umq->umqh_tp == UMQ_INVALID_HANDLE || umq->tp_ops->umq_tp_state_set == NULL) { + UMQ_VLOG_ERR("parameter invalid\n"); + return -UMQ_ERR_EINVAL; + } + + return umq->tp_ops->umq_tp_state_set(umq->umqh_tp, state); +} + umq_state_t umq_state_get(uint64_t umqh) { umq_t *umq = (umq_t *)(uintptr_t)umqh; diff --git a/src/urpc/umq/umq_ub/umq_ub.c b/src/urpc/umq/umq_ub/umq_ub.c index b67a712f749272233ac74b4974f008dab3599d0d..9aed7a71a1ff855d41f84d4c339bf2ca35f2ce17 100644 --- a/src/urpc/umq/umq_ub/umq_ub.c +++ b/src/urpc/umq/umq_ub/umq_ub.c @@ -69,6 +69,11 @@ static int umq_tp_ub_unbind(uint64_t umqh_tp) return umq_ub_unbind_impl(umqh_tp); } +static int umq_tp_ub_state_set(uint64_t umqh_tp, umq_state_t state) +{ + return umq_ub_state_set_impl(umqh_tp, state); +} + static umq_state_t umq_tp_ub_state_get(uint64_t umqh_tp) { return umq_ub_state_get_impl(umqh_tp); @@ -179,6 +184,7 @@ static umq_ops_t g_umq_ub_ops = { .umq_tp_bind_info_get = umq_tp_ub_bind_info_get, .umq_tp_bind = umq_tp_ub_bind, .umq_tp_unbind = umq_tp_ub_unbind, + .umq_tp_state_set = umq_tp_ub_state_set, .umq_tp_state_get = umq_tp_ub_state_get, .umq_tp_log_config_set = umq_tp_ub_log_config_set, .umq_tp_log_config_reset = umq_tp_ub_log_config_reset, diff --git a/src/urpc/umq/umq_ub/umq_ub_impl.c b/src/urpc/umq/umq_ub/umq_ub_impl.c index 5301dffc6d82f92f8bf45d6963c33022fc859606..893c16efaccbc498ae1cfc95fd5321bdba28e378 100644 --- a/src/urpc/umq/umq_ub/umq_ub_impl.c +++ b/src/urpc/umq/umq_ub/umq_ub_impl.c @@ -4364,6 +4364,23 @@ util_id_allocator_t *umq_ub_get_msg_id_generator(uint64_t umqh_tp) return &g_umq_ub_id_allocator; } +int umq_ub_state_set_impl(uint64_t umqh_tp, umq_state_t state) +{ + // todo add only support main umq judgement + if (state != QUEUE_STATE_ERR) { + UMQ_VLOG_ERR("set state only support error\n"); + return -UMQ_ERR_EINVAL; + } + + ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh_tp; + if (queue->state == QUEUE_STATE_ERR) { + UMQ_VLOG_WARN("queue state already in error state\n"); + return UMQ_SUCCESS; + } + + return umq_modify_ubq_to_err(queue); +} + umq_state_t umq_ub_state_get_impl(uint64_t umqh_tp) { ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh_tp; diff --git a/src/urpc/umq/umq_ub/umq_ub_impl.h b/src/urpc/umq/umq_ub/umq_ub_impl.h index 39b25c1263d3f40d69fb8c8247c63954b9f431f5..e2ab460c8e9bfad1eeaf4a2cee4f800cf62033b6 100644 --- a/src/urpc/umq/umq_ub/umq_ub_impl.h +++ b/src/urpc/umq/umq_ub/umq_ub_impl.h @@ -73,6 +73,7 @@ int32_t umq_ub_destroy_impl(uint64_t umqh); int umq_ub_bind_info_get_impl(uint64_t umqh, uint8_t *bind_info, uint32_t bind_info_size); int umq_ub_bind_impl(uint64_t umqh, uint8_t *bind_info, uint32_t bind_info_size); int umq_ub_unbind_impl(uint64_t umqh); +int umq_ub_state_set_impl(uint64_t umqh_tp, umq_state_t state); umq_state_t umq_ub_state_get_impl(uint64_t umqh_tp); int32_t umq_ub_register_memory_impl(void *buf, uint64_t size); diff --git a/src/urpc/umq/umq_ub/umq_ub_plus.c b/src/urpc/umq/umq_ub/umq_ub_plus.c index 5b7459204f94d532fab305e989776c496d1890b0..63bd98e05d2dc68bd425b23fcef88b9fca56a647 100644 --- a/src/urpc/umq/umq_ub/umq_ub_plus.c +++ b/src/urpc/umq/umq_ub/umq_ub_plus.c @@ -79,6 +79,11 @@ static int umq_tp_ub_plus_unbind(uint64_t umqh_tp) return umq_ub_unbind_impl(umqh_tp); } +static int umq_tp_ub_plus_state_set(uint64_t umqh_tp, umq_state_t state) +{ + return umq_ub_state_set_impl(umqh_tp, state); +} + static umq_state_t umq_tp_ub_plus_state_get(uint64_t umqh_tp) { return umq_ub_state_get_impl(umqh_tp); @@ -193,6 +198,7 @@ static umq_ops_t g_umq_ub_plus_ops = { .umq_tp_bind_info_get = umq_tp_ub_plus_bind_info_get, .umq_tp_bind = umq_tp_ub_plus_bind, .umq_tp_unbind = umq_tp_ub_plus_unbind, + .umq_tp_state_set = umq_tp_ub_plus_state_set, .umq_tp_state_get = umq_tp_ub_plus_state_get, .umq_tp_log_config_set = umq_tp_ub_plus_log_config_set, .umq_tp_log_config_reset = umq_tp_ub_plus_log_config_reset,