diff --git a/src/urpc/umq/umq_ub/core/flow_control/umq_ub_flow_control.c b/src/urpc/umq/umq_ub/core/flow_control/umq_ub_flow_control.c index 74eec4108f7f467093242a3fbe66892cb26aaadb..4e94cc4632f19f936eb92791860283f31b38c247 100644 --- a/src/urpc/umq/umq_ub/core/flow_control/umq_ub_flow_control.c +++ b/src/urpc/umq/umq_ub/core/flow_control/umq_ub_flow_control.c @@ -236,6 +236,123 @@ static ALWAYS_INLINE void flow_control_stats_query_atomic(struct ub_flow_control out->total_flow_controlled_wr = __atomic_load_n(&fc->total_flow_controlled_wr, __ATOMIC_RELAXED); } +static ALWAYS_INLINE uint16_t available_credit_inc_atomic(ub_shared_jfr_credit_t *sjfr_credit, uint16_t count) +{ + uint16_t after, before = __atomic_load_n(&sjfr_credit->idle, __ATOMIC_RELAXED); + uint16_t ret = before; + uint32_t rx_sum; + uint16_t actual_count = count; + uint16_t error_count = 0; + do { + rx_sum = before + count; + if (URPC_UNLIKELY(rx_sum > UINT16_MAX)) { + UMQ_LIMIT_VLOG_WARN("rx posted exceed UINT16_MAX, current rx %d, new post %d, capacity %d\n", + before, count, sjfr_credit->capacity); + ret = before; + break; + } + + if (URPC_UNLIKELY(rx_sum > sjfr_credit->capacity)) { + UMQ_LIMIT_VLOG_WARN("rx posted exceed rx depth, current win %d, new win %d, capacity %d\n", + before, count, sjfr_credit->capacity); + rx_sum = sjfr_credit->capacity; + actual_count = count - (rx_sum - sjfr_credit->capacity); + error_count = rx_sum - sjfr_credit->capacity; + + } + after = (uint16_t)rx_sum; + ret = after; + } while ( + !__atomic_compare_exchange_n(&sjfr_credit->idle, &before, after, true, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)); + + if (URPC_UNLIKELY(ret == before)) { + (void)__atomic_add_fetch(&sjfr_credit->history_error_total, actual_count, __ATOMIC_RELAXED); + } else { + (void)__atomic_add_fetch(&sjfr_credit->history_error_total, error_count, __ATOMIC_RELAXED); + (void)__atomic_add_fetch(&sjfr_credit->history_total, actual_count, __ATOMIC_RELAXED); + } + + return ret; +} + +static ALWAYS_INLINE uint16_t available_credit_dec_atomic(ub_shared_jfr_credit_t *sjfr_credit, uint16_t count) +{ + uint16_t after, before = __atomic_load_n(&sjfr_credit->idle, __ATOMIC_RELAXED); + uint16_t ret = before; + do { + if (URPC_UNLIKELY(before == 0)) { + ret = 0; + break; + } + + after = before > count ? before - count : 0; + ret = before - after; + } while ( + !__atomic_compare_exchange_n(&sjfr_credit->idle, &before, after, true, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)); + + return ret; +} + +static ALWAYS_INLINE uint16_t available_credit_load_atomic(ub_shared_jfr_credit_t *sjfr_credit) +{ + return __atomic_load_n(&sjfr_credit->idle, __ATOMIC_RELAXED); +} + +static ALWAYS_INLINE uint16_t available_credit_inc_non_atomic(ub_shared_jfr_credit_t *sjfr_credit, uint16_t count) +{ + uint32_t rx_sum = sjfr_credit->idle + count; + if (URPC_UNLIKELY(rx_sum > UINT16_MAX)) { + UMQ_LIMIT_VLOG_WARN("rx posted exceed UINT16_MAX, current rx %d, new post %d, capacity %d\n", + sjfr_credit->idle, count, sjfr_credit->capacity); + sjfr_credit->history_error_total += count; + return sjfr_credit->idle; + } + + if (URPC_UNLIKELY(rx_sum > sjfr_credit->capacity)) { + UMQ_LIMIT_VLOG_WARN("rx posted exceed rx depth, current win %d, new win %d, capacity %d\n", + sjfr_credit->idle, count, sjfr_credit->capacity); + rx_sum = sjfr_credit->capacity; + sjfr_credit->history_total += (count - (rx_sum - sjfr_credit->capacity)); + sjfr_credit->history_error_total += (rx_sum - sjfr_credit->capacity); + } else { + sjfr_credit->history_total += count; + } + sjfr_credit->idle = (uint16_t)rx_sum; + return sjfr_credit->idle; +} + +static ALWAYS_INLINE uint16_t available_credit_dec_non_atomic(ub_shared_jfr_credit_t *sjfr_credit, uint16_t count) +{ + uint16_t idle = sjfr_credit->idle; + if (idle < count) { + sjfr_credit->idle = 0; + return idle; + } + + sjfr_credit->idle -= count; + return count; +} + +static ALWAYS_INLINE uint16_t available_credit_load_non_atomic(ub_shared_jfr_credit_t *sjfr_credit) +{ + return sjfr_credit->idle; +} + +static void umq_ub_shared_jfr_cerdit_init(ub_queue_t *queue, uint32_t feature, umq_flow_control_cfg_t *cfg) +{ + ub_shared_jfr_credit_t *shared_jfr_credit = &queue->jfr_ctx->credit; + memset(shared_jfr_credit, 0, sizeof(ub_shared_jfr_credit_t)); + if (cfg->use_atomic_window) { + shared_jfr_credit->ops.available_credit_inc = available_credit_inc_atomic; + shared_jfr_credit->ops.available_credit_dec = available_credit_dec_atomic; + shared_jfr_credit->ops.available_credit_load = available_credit_load_atomic; + } else { + shared_jfr_credit->ops.available_credit_inc = available_credit_inc_non_atomic; + shared_jfr_credit->ops.available_credit_dec = available_credit_dec_non_atomic; + shared_jfr_credit->ops.available_credit_load = available_credit_load_non_atomic; + } +} + int umq_ub_flow_control_init(ub_flow_control_t *fc, ub_queue_t *queue, uint32_t feature, umq_flow_control_cfg_t *cfg) { memset(fc, 0, sizeof(ub_flow_control_t)); @@ -244,6 +361,10 @@ int umq_ub_flow_control_init(ub_flow_control_t *fc, ub_queue_t *queue, uint32_t return UMQ_SUCCESS; } + if ((queue->create_flag & UMQ_CREATE_FLAG_SUB_UMQ) == 0) { + // main queue initializes shared jfr credit + umq_ub_shared_jfr_cerdit_init(queue, feature, cfg); + } fc->local_rx_depth = queue->rx_depth; fc->local_tx_depth = queue->tx_depth; fc->initial_window = cfg->initial_window; @@ -363,6 +484,12 @@ void umq_ub_rq_posted_notifier_update(ub_flow_control_t *fc, ub_queue_t *queue, return; } + if ((queue->create_flag & UMQ_CREATE_FLAG_SUB_UMQ) != 0) { + // shared_credit update + ub_shared_jfr_credit_t *shared_credit = &queue->jfr_ctx->credit; + shared_credit->ops.available_credit_inc(shared_credit, rx_posted); + return; + } uint16_t notify = fc->ops.local_rx_posted_inc(fc, rx_posted); if (queue->bind_ctx == NULL) { return; diff --git a/src/urpc/umq/umq_ub/core/private/umq_ub_private.h b/src/urpc/umq/umq_ub/core/private/umq_ub_private.h index c4f757b4d544181d3f3044b6ed78992f70374c88..0ee2309b962509f644d651eb89cd351510e49d7b 100644 --- a/src/urpc/umq/umq_ub/core/private/umq_ub_private.h +++ b/src/urpc/umq/umq_ub/core/private/umq_ub_private.h @@ -207,12 +207,32 @@ typedef struct ub_bind_ctx { uint64_t remote_notify_addr; } ub_bind_ctx_t; +struct ub_shared_jfr_credit; +typedef struct ub_shared_jfr_credit_ops { + // update shared jfr rx_posted after post rx buffer + uint16_t (*available_credit_inc)(struct ub_shared_jfr_credit *shared_credit, uint16_t count); + // load idle credit + uint16_t (*available_credit_load)(struct ub_shared_jfr_credit *shared_credit); + // distribute x in batch + uint16_t (*available_credit_dec)(struct ub_shared_jfr_credit *shared_credit, uint16_t count); +} ub_shared_jfr_credit_ops_t; + +typedef struct ub_shared_jfr_credit { + ub_shared_jfr_credit_ops_t ops; + volatile uint64_t history_total; // history total credit + volatile uint64_t history_error_total; + volatile uint64_t history_consumed; // history consumed credit, allocated + volatile uint16_t idle; // curent avilable credit + uint16_t capacity; +} ub_shared_jfr_credit_t; + typedef struct jfr_ctx { urma_jfr_t *jfr; urma_jfc_t *jfr_jfc; urma_jfce_t *jfr_jfce; volatile uint32_t ref_cnt; rx_buf_ctx_list_t rx_buf_ctx_list; + ub_shared_jfr_credit_t credit; } jfr_ctx_t; typedef struct ub_queue { diff --git a/src/urpc/umq/umq_ub/core/umq_ub_impl.c b/src/urpc/umq/umq_ub/core/umq_ub_impl.c index 99377accdbdffb0ae2109588e674f51969107aae..dbf9e2c0154c3dd1973c8f731e18867ad6e95427 100644 --- a/src/urpc/umq/umq_ub/core/umq_ub_impl.c +++ b/src/urpc/umq/umq_ub/core/umq_ub_impl.c @@ -578,20 +578,20 @@ uint64_t umq_ub_create_impl(uint64_t umqh, uint8_t *ctx, umq_create_option_t *op } } - if (umq_ub_flow_control_init(&queue->flow_control, queue, dev_ctx->feature, &dev_ctx->flow_control) != - UMQ_SUCCESS) { + if(umq_ub_jfr_ctx_create(queue, dev_ctx, option, share_rq) != UMQ_SUCCESS) { goto FREE_QUEUE; } - if(umq_ub_jfr_ctx_create(queue, dev_ctx, option, share_rq) != UMQ_SUCCESS) { - goto UNINIT_FLOW_CONTROL; + if (umq_ub_flow_control_init(&queue->flow_control, queue, dev_ctx->feature, &dev_ctx->flow_control) != + UMQ_SUCCESS) { + goto DESTROY_JFR_CTX; } if (queue->mode == UMQ_MODE_INTERRUPT) { queue->jfs_jfce = urma_create_jfce(dev_ctx->urma_ctx); if (queue->jfs_jfce == NULL) { UMQ_VLOG_ERR("create jfs_jfce failed\n"); - goto DESTROY_JFR_CTX; + goto UNINIT_FLOW_CONTROL; } } @@ -639,10 +639,13 @@ DELETE_JFCE: if (queue->mode == UMQ_MODE_INTERRUPT) { (void)urma_delete_jfce(queue->jfs_jfce); } -DESTROY_JFR_CTX: - umq_ub_jfr_ctx_destroy(queue); + UNINIT_FLOW_CONTROL: umq_ub_flow_control_uninit(&queue->flow_control); + +DESTROY_JFR_CTX: + umq_ub_jfr_ctx_destroy(queue); + FREE_QUEUE: umq_dec_ref(dev_ctx->io_lock_free, &dev_ctx->ref_cnt, 1); free(queue);