diff --git a/src/urpc/umq/umq_ub/core/flow_control/umq_ub_flow_control.c b/src/urpc/umq/umq_ub/core/flow_control/umq_ub_flow_control.c index 74eec4108f7f467093242a3fbe66892cb26aaadb..84a80a60b001bedd8508d31582c5be967814303c 100644 --- a/src/urpc/umq/umq_ub/core/flow_control/umq_ub_flow_control.c +++ b/src/urpc/umq/umq_ub/core/flow_control/umq_ub_flow_control.c @@ -338,6 +338,7 @@ void umq_ub_window_read(ub_flow_control_t *fc, ub_queue_t *queue) .tjetty = queue->bind_ctx->tjetty}; urma_status_t status = urma_post_jetty_send_wr(queue->jetty, &urma_wr, &bad_wr); if (status == URMA_SUCCESS) { + umq_inc_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, 1); fc->remote_get = true; return; } @@ -417,6 +418,7 @@ void umq_ub_rq_posted_notifier_update(ub_flow_control_t *fc, ub_queue_t *queue, urma_jfs_wr_t *bad_wr = NULL; urma_status_t status = urma_post_jetty_send_wr(queue->jetty, &urma_wr, &bad_wr); if (status == URMA_SUCCESS) { + umq_inc_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, 1); return; } diff --git a/src/urpc/umq/umq_ub/core/private/umq_pro_ub.c b/src/urpc/umq/umq_ub/core/private/umq_pro_ub.c index 1e141e7e7ec6bd06856eb93feead1124cd2a1fd7..c48228175e0c0e67162dd19d3622e496fa100213 100644 --- a/src/urpc/umq/umq_ub/core/private/umq_pro_ub.c +++ b/src/urpc/umq/umq_ub/core/private/umq_pro_ub.c @@ -275,9 +275,11 @@ int umq_ub_post_tx(uint64_t umqh, umq_buf_t *qbuf, umq_buf_t **bad_qbuf) } } (urma_wr_ptr - 1)->next = NULL; - max_tx = opcode_consume_rqe ? umq_ub_window_dec(&queue->flow_control, queue, wr_index) : wr_index; + uint32_t inc_tx = umq_inc_tx_ref(queue, wr_index); + max_tx = opcode_consume_rqe ? umq_ub_window_dec(&queue->flow_control, queue, inc_tx) : inc_tx; if (max_tx == 0) { *bad_qbuf = qbuf; + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, inc_tx); ret = -UMQ_ERR_EAGAIN; goto ERROR; } else if (max_tx < wr_index) { @@ -291,9 +293,10 @@ int umq_ub_post_tx(uint64_t umqh, umq_buf_t *qbuf, umq_buf_t **bad_qbuf) if (status != URMA_SUCCESS) { ret = -(int)status; if (bad_wr != NULL) { - *bad_qbuf = (umq_buf_t *)(uintptr_t)bad_wr->user_ctx; + process_bad_qbuf(bad_wr, bad_qbuf, qbuf, queue, inc_tx); } else { *bad_qbuf = qbuf; + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, inc_tx); } UMQ_LIMIT_VLOG_ERR("urma_post_jetty_send_wr failed, status %d, local eid: " EID_FMT ", " "local jetty_id: %u, remote eid: " EID_FMT ", remote jetty_id: %u\n", (int)status, @@ -305,6 +308,7 @@ int umq_ub_post_tx(uint64_t umqh, umq_buf_t *qbuf, umq_buf_t **bad_qbuf) if (max_tx < wr_index) { *bad_qbuf = (umq_buf_t *)(uintptr_t)urma_wr[max_tx].user_ctx; umq_ub_recover_tx_imm(queue, urma_wr, wr_index, *bad_qbuf); + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, inc_tx - max_tx); umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->ref_cnt, 1); return -UMQ_ERR_EAGAIN; } @@ -779,15 +783,18 @@ int umq_ub_poll_tx(uint64_t umqh, umq_buf_t **buf, uint32_t buf_count) // recover flow control window and rx_posted if (cr[i].user_ctx == 0) { queue->flow_control.remote_get = false; + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, 1); UMQ_LIMIT_VLOG_ERR("get remote window post read failed\n"); continue; } else if (cr[i].user_ctx <= UINT16_MAX) { umq_ub_window_inc(&queue->flow_control, 1); + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, 1); umq_ub_rq_posted_notifier_inc(&queue->flow_control, (uint16_t)cr[i].user_ctx); continue; } } + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, 1); if (cr[i].user_ctx == 0) { // window read ok uint16_t *remote_win = diff --git a/src/urpc/umq/umq_ub/core/private/umq_ub.c b/src/urpc/umq/umq_ub/core/private/umq_ub.c index 8513466606591ba3373f6cbf0eec97d38b348bed..83d6f66b7d3d05788c7a027468b44c31a07b828d 100644 --- a/src/urpc/umq/umq_ub/core/private/umq_ub.c +++ b/src/urpc/umq/umq_ub/core/private/umq_ub.c @@ -1402,7 +1402,7 @@ FREE_BUF: return UMQ_FAIL; } -int umq_ub_plus_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t *urma_wr_ptr, uint32_t remain_tx) +int umq_ub_plus_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t *urma_wr_ptr) { uint32_t max_sge_num = queue->max_tx_sge; urma_sge_t sges[UMQ_POST_POLL_BATCH][max_sge_num]; @@ -1478,10 +1478,10 @@ int umq_ub_plus_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t * urma_wr_ptr++; (urma_wr_ptr - 1)->next = urma_wr_ptr; wr_index++; - if ((wr_index == remain_tx || wr_index == UMQ_POST_POLL_BATCH) && buffer != NULL) { - // wr count exceed remain_tx or UMQ_POST_POLL_BATCH - UMQ_LIMIT_VLOG_ERR("wr count %u exceeds remain_tx %u or max_post_size %d, not supported\n", wr_index, - remain_tx, UMQ_POST_POLL_BATCH); + if (wr_index == UMQ_POST_POLL_BATCH && buffer != NULL) { + // wr count exceed UMQ_POST_POLL_BATCH + UMQ_LIMIT_VLOG_ERR("wr count %u exceeds max_post_size %d, not supported\n", wr_index, + UMQ_POST_POLL_BATCH); return -UMQ_ERR_EINVAL; } } @@ -1657,7 +1657,7 @@ int umq_ub_dequeue_plus_with_poll_rx(uint64_t umqh_tp, urma_cr_t *cr, umq_buf_t return qbuf_cnt; } -void process_bad_qbuf(urma_jfs_wr_t *bad_wr, umq_buf_t **bad_qbuf, umq_buf_t *qbuf, ub_queue_t *queue) +void process_bad_qbuf(urma_jfs_wr_t *bad_wr, umq_buf_t **bad_qbuf, umq_buf_t *qbuf, ub_queue_t *queue, uint32_t inc_tx) { *bad_qbuf = (umq_buf_t *)(uintptr_t)bad_wr->user_ctx; umq_buf_t *tmp_qbuf = qbuf; @@ -1665,14 +1665,20 @@ void process_bad_qbuf(urma_jfs_wr_t *bad_wr, umq_buf_t **bad_qbuf, umq_buf_t *qb umq_buf_t *previous = NULL; while (tmp_qbuf != NULL && tmp_qbuf != *bad_qbuf) { count++; - previous = tmp_qbuf; - tmp_qbuf = tmp_qbuf->qbuf_next; + uint32_t rest_data_size = tmp_qbuf->total_data_size; + while (tmp_qbuf && rest_data_size > 0) { + if (rest_data_size <= tmp_qbuf->data_size) { + previous = tmp_qbuf; + } + rest_data_size -= tmp_qbuf->data_size; + tmp_qbuf = tmp_qbuf->qbuf_next; + } } if (previous) { // break chain of succeed qbuf and failed qbuf on tx previous->qbuf_next = NULL; } - umq_inc_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, count); + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, inc_tx - count); } void umq_ub_enqueue_with_poll_tx(ub_queue_t *queue, umq_buf_t **buf) @@ -1847,7 +1853,7 @@ int umq_ub_write_imm(uint64_t umqh_tp, uint64_t target_addr, uint32_t len, uint6 return UMQ_SUCCESS; } -int umq_ub_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t *urma_wr_ptr, uint32_t remain_tx) +int umq_ub_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t *urma_wr_ptr) { uint32_t max_sge_num = queue->max_tx_sge; urma_sge_t sges[UMQ_POST_POLL_BATCH][max_sge_num]; @@ -1911,10 +1917,10 @@ int umq_ub_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t *urma_ urma_wr_ptr++; (urma_wr_ptr - 1)->next = urma_wr_ptr; wr_index++; - if ((wr_index == remain_tx || wr_index == UMQ_POST_POLL_BATCH) && buffer != NULL) { - // wr count exceed remain_tx or UMQ_POST_POLL_BATCH - UMQ_LIMIT_VLOG_ERR("wr count %u exceeds remain_tx %u or max_post_size %d, not supported\n", wr_index, - remain_tx, UMQ_POST_POLL_BATCH); + if (wr_index == UMQ_POST_POLL_BATCH && buffer != NULL) { + // wr count exceed UMQ_POST_POLL_BATCH + UMQ_LIMIT_VLOG_ERR("wr count %u exceeds or max_post_size %d, not supported\n", wr_index, + UMQ_POST_POLL_BATCH); return -UMQ_ERR_EINVAL; } } @@ -1958,4 +1964,33 @@ void umq_flush_tx(ub_queue_t *queue, uint32_t max_retry_times) } retry_times++; } +} + +static uint32_t umq_inc_tx_ref_atomic(ub_queue_t *queue, uint32_t required_tx) +{ + uint32_t after, before, inc_tx = 0; + uint32_t tx_depth = queue->tx_depth; + do { + before = __atomic_load_n(&queue->tx_outstanding, __ATOMIC_RELAXED); + if (URPC_UNLIKELY(before == tx_depth)) { + inc_tx = 0; + break; + } + + after = before + required_tx > tx_depth ? tx_depth : before + required_tx; + inc_tx = after - before; + } while ( + !__atomic_compare_exchange_n(&queue->tx_outstanding, &before, after, true, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)); + return inc_tx; +} + +uint32_t umq_inc_tx_ref(ub_queue_t *queue, uint32_t n) +{ + if (queue->dev_ctx->io_lock_free) { + uint32_t old_value = queue->tx_outstanding; + queue->tx_outstanding = old_value + n >= queue->tx_depth ? queue->tx_depth : old_value + n; + return queue->tx_outstanding - old_value; + } else { + return umq_inc_tx_ref_atomic(queue, n); + } } \ No newline at end of file diff --git a/src/urpc/umq/umq_ub/core/private/umq_ub_private.h b/src/urpc/umq/umq_ub/core/private/umq_ub_private.h index 72cb51c1ed3063dffa717e3b30024c09b092c358..25472b3585e0666899ccca0caa3a9b22fd845027 100644 --- a/src/urpc/umq/umq_ub/core/private/umq_ub_private.h +++ b/src/urpc/umq/umq_ub/core/private/umq_ub_private.h @@ -330,14 +330,14 @@ util_id_allocator_t *umq_ub_id_allocator_get(void); umq_buf_t *umq_ub_read_ctx_create(ub_queue_t *queue, umq_imm_head_t *umq_imm_head, uint16_t buf_num, uint16_t msg_id); -int umq_ub_plus_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t *urma_wr_ptr, uint32_t remain_tx); +int umq_ub_plus_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t *urma_wr_ptr); int umq_ub_dequeue_plus_with_poll_tx(ub_queue_t *queue, urma_cr_t *cr, umq_buf_t **buf, int return_rx_cnt); void fill_big_data_ref_sge(ub_queue_t *queue, ub_ref_sge_t *ref_sge, umq_buf_t *buffer, ub_import_mempool_info_t *import_mempool_info, umq_imm_head_t *umq_imm_head); void umq_ub_fill_rx_buffer(ub_queue_t *queue, int rx_cnt); int umq_ub_dequeue_with_poll_rx(ub_queue_t *queue, urma_cr_t *cr, umq_buf_t **buf); int umq_ub_dequeue_plus_with_poll_rx(uint64_t umqh_tp, urma_cr_t *cr, umq_buf_t **buf); -void process_bad_qbuf(urma_jfs_wr_t *bad_wr, umq_buf_t **bad_qbuf, umq_buf_t *qbuf, ub_queue_t *queue); +void process_bad_qbuf(urma_jfs_wr_t *bad_wr, umq_buf_t **bad_qbuf, umq_buf_t *qbuf, ub_queue_t *queue, uint32_t inc_tx); void umq_ub_enqueue_with_poll_tx(ub_queue_t *queue, umq_buf_t **buf); void umq_ub_enqueue_plus_with_poll_tx(ub_queue_t *queue, umq_buf_t **buf); void umq_flush_rx(ub_queue_t *queue, uint32_t max_retry_times); @@ -346,7 +346,8 @@ void ub_fill_umq_imm_head(umq_imm_head_t *umq_imm_head, umq_buf_t *buffer); int umq_ub_send_imm(ub_queue_t *queue, uint64_t imm_value, urma_sge_t *sge, uint64_t user_ctx); int umq_ub_write_imm(uint64_t umqh_tp, uint64_t target_addr, uint32_t len, uint64_t imm_value); int umq_ub_read(uint64_t umqh_tp, umq_buf_t *rx_buf, umq_ub_imm_t imm); -int umq_ub_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t *urma_wr_ptr, uint32_t remain_tx); +int umq_ub_fill_wr_impl(umq_buf_t *qbuf, ub_queue_t *queue, urma_jfs_wr_t *urma_wr_ptr); +uint32_t umq_inc_tx_ref(ub_queue_t *queue, uint32_t n); #ifdef __cplusplus } diff --git a/src/urpc/umq/umq_ub/core/umq_ub_impl.c b/src/urpc/umq/umq_ub/core/umq_ub_impl.c index ee0f3ac803339edd24edf08e217a348a783b1eec..ad02e3d09c381862ed5639429d73e0455145ce69 100644 --- a/src/urpc/umq/umq_ub/core/umq_ub_impl.c +++ b/src/urpc/umq/umq_ub/core/umq_ub_impl.c @@ -886,32 +886,43 @@ int32_t umq_ub_enqueue_impl(uint64_t umqh_tp, umq_buf_t *qbuf, umq_buf_t **bad_q *bad_qbuf = NULL; int ret = UMQ_SUCCESS; - uint32_t tx_outstanding = umq_fetch_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding); - int remain_tx = queue->tx_depth - tx_outstanding; - if (remain_tx <= 0) { - ret = -UMQ_ERR_EAGAIN; - goto ERROR; - } - int wr_num = umq_ub_fill_wr_impl(qbuf, queue, urma_wr, (uint32_t)remain_tx); + int wr_num = umq_ub_fill_wr_impl(qbuf, queue, urma_wr); if (wr_num < 0) { *bad_qbuf = qbuf; ret = wr_num; goto ERROR; } + + uint32_t inc_tx = umq_inc_tx_ref(queue, wr_num); + if (inc_tx == 0) { + *bad_qbuf = qbuf; + ret = -UMQ_ERR_EAGAIN; + goto ERROR; + } else if (inc_tx < (uint32_t)wr_num) { + urma_wr[inc_tx - 1].next = NULL; + } urma_jfs_wr_t *bad_wr = NULL; uint64_t start_timestamp = umq_perf_get_start_timestamp_with_feature(queue->dev_ctx->feature); urma_status_t status = urma_post_jetty_send_wr(queue->jetty, urma_wr, &bad_wr); umq_perf_record_write_with_feature(UMQ_PERF_RECORD_TRANSPORT_POST_SEND, start_timestamp, queue->dev_ctx->feature); if (status != URMA_SUCCESS) { if (bad_wr != NULL) { - process_bad_qbuf(bad_wr, bad_qbuf, qbuf, queue); + process_bad_qbuf(bad_wr, bad_qbuf, qbuf, queue, inc_tx); + } else { + *bad_qbuf = qbuf; + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, inc_tx); } UMQ_LIMIT_VLOG_ERR("urma_post_jetty_send_wr failed, status %d\n", status); ret = -status; goto ERROR; } - umq_inc_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, wr_num); + if (inc_tx < (uint32_t)wr_num) { + *bad_qbuf = (umq_buf_t *)(uintptr_t)urma_wr[inc_tx].user_ctx; + ret = -UMQ_ERR_EAGAIN; + goto ERROR; + } + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->ref_cnt, 1); return ret; @@ -934,15 +945,9 @@ int32_t umq_ub_enqueue_impl_plus(uint64_t umqh_tp, umq_buf_t *qbuf, umq_buf_t ** umq_inc_ref(queue->dev_ctx->io_lock_free, &queue->ref_cnt, 1); umq_ub_enqueue_plus_with_poll_tx(queue, buf); - uint32_t tx_outstanding = umq_fetch_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding); - int remain_tx = queue->tx_depth - tx_outstanding; - if (remain_tx <= 0) { - ret = -UMQ_ERR_EAGAIN; - goto DEC_REF; - } urma_jfs_wr_t urma_wr[UMQ_POST_POLL_BATCH]; - int wr_num = umq_ub_plus_fill_wr_impl(qbuf, queue, urma_wr, (uint32_t)remain_tx); + int wr_num = umq_ub_plus_fill_wr_impl(qbuf, queue, urma_wr); if (wr_num < 0) { *bad_qbuf = qbuf; ret = wr_num; @@ -951,20 +956,37 @@ int32_t umq_ub_enqueue_impl_plus(uint64_t umqh_tp, umq_buf_t *qbuf, umq_buf_t ** ret = UMQ_SUCCESS; goto DEC_REF; } + + uint32_t inc_tx = umq_inc_tx_ref(queue, wr_num); + if (inc_tx == 0) { + *bad_qbuf = qbuf; + ret = -UMQ_ERR_EAGAIN; + goto DEC_REF; + } else if (inc_tx < (uint32_t)wr_num) { + urma_wr[inc_tx - 1].next = NULL; + } urma_jfs_wr_t *bad_wr = NULL; uint64_t start_timestamp = umq_perf_get_start_timestamp_with_feature(queue->dev_ctx->feature); urma_status_t status = urma_post_jetty_send_wr(queue->jetty, urma_wr, &bad_wr); umq_perf_record_write_with_feature(UMQ_PERF_RECORD_TRANSPORT_POST_SEND, start_timestamp, queue->dev_ctx->feature); if (status != URMA_SUCCESS) { if (bad_wr != NULL) { - process_bad_qbuf(bad_wr, bad_qbuf, qbuf, queue); + process_bad_qbuf(bad_wr, bad_qbuf, qbuf, queue, inc_tx); + } else { + *bad_qbuf = qbuf; + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, inc_tx); } UMQ_LIMIT_VLOG_ERR("urma_post_jetty_send_wr failed, status %d\n", status); ret = -status; goto DEC_REF; } - umq_inc_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, wr_num); + if (inc_tx < (uint32_t)wr_num) { + *bad_qbuf = (umq_buf_t *)(uintptr_t)urma_wr[inc_tx].user_ctx; + ret = -UMQ_ERR_EAGAIN; + goto DEC_REF; + } + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->ref_cnt, 1); return ret;