diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index b53409bb30161bb70f8e56a6776eb54bff8bd703..f1d4320be4d967858db8169273f61c7be5259f05 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -149,6 +149,7 @@ endif() set(CMAKE_SKIP_RPATH TRUE) set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS} -lrt -fno-var-tracking-assignments -pipe -g -Wall -fPIC -fno-omit-frame-pointer -fno-common -fno-stack-protector") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS} -lrt -fno-var-tracking-assignments -Wno-terminate -Wno-delete-incomplete -pipe -Wall -Wno-strict-aliasing -Wtrampolines -D_FORTIFY_SOURCE=2 -fPIC -finline-functions -fstack-protector-strong -s -Wl,-z,noexecstack -Wl,-z,relro,-z,now -finline-limit=6000 --param inline-unit-growth=300") +add_compile_options("-march=armv8-a+sve") # Default including directory include_directories("${CMAKE_CURRENT_SOURCE_DIR}/core/include/" "${CMAKE_CURRENT_SOURCE_DIR}" diff --git a/cpp/streaming/runtime/partitioner/V2/StreamPartitionerV2.h b/cpp/streaming/runtime/partitioner/V2/StreamPartitionerV2.h index b00befeeabe428189ef9df411538ba54dbb0cf12..379484684b7535fb1ca2ba988f65de7a3416d8d0 100644 --- a/cpp/streaming/runtime/partitioner/V2/StreamPartitionerV2.h +++ b/cpp/streaming/runtime/partitioner/V2/StreamPartitionerV2.h @@ -18,6 +18,7 @@ #include "table/data/vectorbatch/VectorBatch.h" #include "vector/vector_helper.h" #include "table/data/Row.h" +#include namespace omnistream { template @@ -80,7 +81,6 @@ public: } delete vectorBatch; - delete streamRecord; return result; } @@ -91,6 +91,35 @@ public: [[nodiscard]] virtual std::string toString() const = 0; virtual std::unique_ptr getUpstreamSubtaskStateMapper() = 0; + void setRowKind_sve(int i, int size, uint8_t* src, int32_t* offsets, uint8_t* dst) { + auto pg = svwhilelt_b32(i, size); + svint32_t offsetRaw = svld1_s32(pg, offsets); + svuint32_t rawData = svld1ub_gather_offset_u32(pg, src, offsetRaw); + + svuint8_t u8_vec = svreinterpret_u8_u32(rawData); + svuint8_t indices = svindex_u8(0, sizeof(uint32_t)); + svuint8_t packed = svtbl(u8_vec, indices); + + auto pg2 = svwhilelt_b8(i, size); + svst1_u8(pg2, dst, packed); + } + + void setTimestamp_sve(int i, int size, int64_t* src, int32_t* offsets, int64_t* dst) { + auto pg = svwhilelt_b32(i, size); + svint32_t offsetRaw = svld1_s32(pg, offsets); + svint64_t offset1 = svunpklo(offsetRaw); + svint64_t offset2 = svunpkhi(offsetRaw); + + auto pg2 = svwhilelt_b64(i, size); + svint64_t rawData = svld1_gather_index(pg2, src, offset1); + svst1_s64(pg2, dst, rawData); + + int jump = svcntd(); + auto pg3 = svwhilelt_b64(i + jump, size); + svint64_t rawData2 = svld1_gather_index(pg3, src, offset2); + svst1_s64(pg3, dst + jump, rawData2); + } + StreamRecord* buildNewStreamRecordBasedOnOffsets(std::vector& offsets, StreamRecord* originStreamRecord, long timestamp) { @@ -110,10 +139,12 @@ public: offsets.data(), 0, offsets.size())); } } - for (size_t i = 0; i < offsets.size(); i++) { + int processElement = svcntw(); + int size = offsets.size(); + for (size_t i = 0; i < offsets.size(); i += processElement) { int position = offsets[i]; - copyedVectorBatch->setRowKind(i, vectorBatch->getRowKind(position)); - copyedVectorBatch->setTimestamp(i, vectorBatch->getTimestamp(position)); + setRowKind_sve(i, size, reinterpret_cast(vectorBatch->getRowKinds()), offsets.data() + i, reinterpret_cast(copyedVectorBatch->getRowKinds()) + i); + setTimestamp_sve(i, size, vectorBatch->getTimestamps(), offsets.data() + i, copyedVectorBatch->getTimestamps() + i); } return new StreamRecord(copyedVectorBatch, timestamp); } diff --git a/cpp/table/data/util/VectorBatchUtil.h b/cpp/table/data/util/VectorBatchUtil.h index efeef0a92b051e24ad92b269717ba79e7a381de8..ff966d247975f91baea32de272acc35227144c64 100644 --- a/cpp/table/data/util/VectorBatchUtil.h +++ b/cpp/table/data/util/VectorBatchUtil.h @@ -13,6 +13,7 @@ #include "table/data/vectorbatch/VectorBatch.h" #include "OmniOperatorJIT/core/src/vector/unsafe_vector.h" +#include class VectorBatchUtil { public: @@ -35,6 +36,36 @@ public: return static_cast(ubatchId); } + static void deComboIDSVE(uint64_t* src, uint32_t* batchIDdst, uint32_t* rowIDdst, int num) + { + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, src + i); + svuint64_t comboID2 = svld1(pg2, src + i + half); + + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } + } + + static void getComboId_sve(int batchId, int rowCount, int64_t* result) { + int processNum = svcntd(); + svint64_t batchData = svlsl_n_s64_x(svptrue_b64(), svdup_n_s64(batchId), 32); + for (int i = 0; i < rowCount; i+= processNum) { + svbool_t pg = svwhilelt_b64(i, rowCount); + svint64_t rowData = svindex_s64(i, 1); + svint64_t comboIDs = svorr_z(pg, batchData, rowData); + svst1_s64(pg, result + i, comboIDs); + } + } + // To print VectorBatch, use VectorHelper::PrintVecBatch from OmniOperatorJIT/core/src/vector/vector_helper.h template diff --git a/cpp/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.cpp b/cpp/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.cpp index d78c103ee7a60b54c4977c57be468cf2a45eb5bb..521ca42a95e8a60f593863afe3a18f2b9f0d5bd0 100644 --- a/cpp/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.cpp +++ b/cpp/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.cpp @@ -45,8 +45,10 @@ unordered_map RowTimeDeduplicateFunction::getUpdateState( { unordered_map tmpState; int curBatchId = getCurrentBatchId() - 1; + long* comboIDs = new long[rowCount]; + VectorBatchUtil::getComboId_sve(curBatchId, rowCount, comboIDs); for (int i = 0; i < rowCount; i++) { - long comboId = VectorBatchUtil::getComboId(curBatchId, i); + long comboId = comboIDs[i]; // 建立key RowData *key = groupByKeySelector->getKey(inputVB, i); @@ -63,11 +65,12 @@ unordered_map RowTimeDeduplicateFunction::getUpdateState( } else if (itTmp != tmpState.end()) { long curComboId = itTmp->second; if (isDuplicate(curComboId, comboId)) { - itTmp->second = comboId; + tmpState[key] = comboId; } delete key; } } + delete[] comboIDs; return tmpState; } diff --git a/cpp/table/runtime/operators/join/AbstractStreamingJoinOperator.h b/cpp/table/runtime/operators/join/AbstractStreamingJoinOperator.h index 8d50d13a6d21e1c36b966cdab166f1b24594b602..a56987fc24a21f16392fe20292b8c5d26ae86118 100644 --- a/cpp/table/runtime/operators/join/AbstractStreamingJoinOperator.h +++ b/cpp/table/runtime/operators/join/AbstractStreamingJoinOperator.h @@ -31,6 +31,8 @@ #include "streaming/api/operators/TimestampedCollector.h" #include "JoinRecordStateView.h" +#include + // joinCondition includes 2 steps: // (1) check if key is null // (2) check if filter condition is satisfied @@ -127,7 +129,7 @@ protected: // Null-padded entries that need to be inserted/deleted std::vector deleteRecords; // Kinds for those null-padded entries based on accumulate(0) or retract(1) - std::vector deleteKinds; + std::vector deleteKinds; FilterFuncPtr generatedFilter = nullptr; JoinedRowFilterFunc joinCondition; @@ -259,12 +261,12 @@ void AbstractStreamingJoinOperator::of( if (RowDataUtil::isAccumulateMsg(input->getRowKind(i))) { if (std::get<1>(it->second) == 0) { deleteRecords.push_back(std::get<2>(it->second)); - deleteKinds.push_back(0); + deleteKinds.push_back(static_cast(0)); } } else { if (std::get<1>(it->second) == 1) { deleteRecords.push_back(std::get<2>(it->second)); - deleteKinds.push_back(1); + deleteKinds.push_back(static_cast(1)); } } int32_t newNumAssociate = RowDataUtil::isAccumulateMsg(input->getRowKind(i))? std::get<1>(it->second) + 1 : std::get<1>(it->second) - 1; @@ -397,10 +399,30 @@ std::unique_ptr> AbstractStreamingJoinOperator::filterRe } } + int num = (*matchedRecords).size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, reinterpret_cast((*matchedRecords).data()) + i); + svuint64_t comboID2 = svld1(pg2, reinterpret_cast((*matchedRecords).data()) + i + half); + + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } + // for the otherSide - for (const int64_t &comboId : *matchedRecords) { - int32_t othersideRowId = VectorBatchUtil::getRowId(comboId); - int32_t othersideBatchId = VectorBatchUtil::getBatchId(comboId); + for (int i = 0; i < num; i++) { + int32_t othersideRowId = rowIDdst[i]; + int32_t othersideBatchId = batchIDdst[i]; for (auto col : colRefsForNonEquiCondition) { bool isLeftColumn = col < leftArity; @@ -413,12 +435,14 @@ std::unique_ptr> AbstractStreamingJoinOperator::filterRe omniruntime::op::ExecutionContext context; auto result = generatedFilter( - vals.data(), reinterpret_cast(nulls.data()), nullptr, &resultBool, nullptr, (int64_t)(&context)); + vals.data(), reinterpret_cast(nulls.data()), nullptr, &resultBool, nullptr, (int64_t)(&context)); + if (result) { - filteredRecords->push_back(comboId); + filteredRecords->push_back((*matchedRecords)[i]); } } - + delete[] rowIDdst; + delete[] batchIDdst; return filteredRecords; } diff --git a/cpp/table/runtime/operators/join/JoinRecordStateView.h b/cpp/table/runtime/operators/join/JoinRecordStateView.h index b1b58d51898ab806f3e3ff30d0d4c6f422991feb..4c2f08f37a216ab136a76e2c8459479d4804c0e6 100644 --- a/cpp/table/runtime/operators/join/JoinRecordStateView.h +++ b/cpp/table/runtime/operators/join/JoinRecordStateView.h @@ -177,6 +177,8 @@ void InputSideHasNoUniqueKey::addOrRectractRecord(omnistream::VectorBatch *in } std::vector xxh128Hashes = input->getXXH128s(); + long* comboIDs = new long[input->GetRowCount()]; + VectorBatchUtil::getComboId_sve(batchId, input->GetRowCount(), comboIDs); for (int i = 0; i < input->GetRowCount(); i++) { if (filterNulls && keySelector->isAnyKeyNull(input, i)) { continue; @@ -188,7 +190,7 @@ void InputSideHasNoUniqueKey::addOrRectractRecord(omnistream::VectorBatch *in recordStateVB->updateOrCreate( ukey, /* default value used only if key is missing and delta is positive */ - UV {1, VectorBatchUtil::getComboId(batchId, i)}, + UV {1, comboIDs[i]}, [delta, &numAssociates, i](UV& val) -> std::optional { int newCount = std::get<0>(val) + delta; if (newCount != 0) { @@ -204,6 +206,7 @@ void InputSideHasNoUniqueKey::addOrRectractRecord(omnistream::VectorBatch *in delete key; } } + delete[] comboIDs; } template diff --git a/cpp/table/runtime/operators/join/StreamingJoinOperator.cpp b/cpp/table/runtime/operators/join/StreamingJoinOperator.cpp index 612de1d9eedd606f49b315dd7b4552c6d8a70e43..2823adf20474716f3b89a68b4d960a19b625e813 100644 --- a/cpp/table/runtime/operators/join/StreamingJoinOperator.cpp +++ b/cpp/table/runtime/operators/join/StreamingJoinOperator.cpp @@ -214,10 +214,26 @@ omniruntime::vec::BaseVector *StreamingJoinOperator::buildOtherSideColumn(omn } } + int num = this->deleteRecords.size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, reinterpret_cast(this->deleteRecords.data()) + i); + svuint64_t comboID2 = svld1(pg2, reinterpret_cast(this->deleteRecords.data()) + i + half); + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } // Loop wont run for inner join as deletedRecords can have elements only if other is Outer - for (auto id : this->deleteRecords) { - auto batchId = VectorBatchUtil::getBatchId(id); - auto rowId = VectorBatchUtil::getRowId(id); + for (int i = 0; i < num; i++) { + auto batchId = batchIDdst[i]; + auto rowId = rowIDdst[i]; if (curbatchId != batchId) { if (otherSideStateView->getVectorBatch(batchId) == nullptr) { throw std::runtime_error("get batch is nullptr in buildOtherSideColumn"); @@ -229,7 +245,8 @@ omniruntime::vec::BaseVector *StreamingJoinOperator::buildOtherSideColumn(omn auto val = inputCol->GetValue(rowId); outputCol->SetValue(rowIndex++, val); } - + delete[] batchIDdst; + delete[] rowIDdst; return outputCol; } @@ -274,17 +291,33 @@ omniruntime::vec::BaseVector *StreamingJoinOperator::buildOtherSideColumnVarc outputCol->SetNull(rowIndex++); } } + int num = this->deleteRecords.size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, reinterpret_cast(this->deleteRecords.data()) + i); + svuint64_t comboID2 = svld1(pg2, reinterpret_cast(this->deleteRecords.data()) + i + half); + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } // Loop wont run for inner join as deletedRecords can have elements only if other is Outer - for (auto id : this->deleteRecords) { - auto batchId = VectorBatchUtil::getBatchId(id); - auto rowId = VectorBatchUtil::getRowId(id); - auto vectorBatch = otherSideStateView->getVectorBatch(batchId); - if (vectorBatch == nullptr) { + for (int i = 0; i < num; i++) { + auto batchId = batchIDdst[i]; + auto rowId = rowIDdst[i]; + auto inputCol = otherSideStateView->getVectorBatch(batchId)->Get(icol); + if (otherSideStateView->getVectorBatch(batchId) == nullptr) { LOG("string from vectorBatch is nullptr") throw std::runtime_error("string from vectorBatch is nullptr"); } - auto inputCol = vectorBatch->Get(icol); if (inputCol->GetEncoding() == OMNI_FLAT) { auto castedCol = reinterpret_cast(inputCol); auto sv = castedCol->GetValue(rowId); @@ -295,7 +328,8 @@ omniruntime::vec::BaseVector *StreamingJoinOperator::buildOtherSideColumnVarc outputCol->SetValue(rowIndex++, sv); } } - + delete[] batchIDdst; + delete[] rowIDdst; return outputCol; } @@ -445,6 +479,23 @@ RowKind StreamingJoinOperator::getOutputVBRowKind(omnistream::VectorBatch *in } } +template +void StreamingJoinOperator::setRowKind_sve(int i, int size, uint8_t* dst, int8_t* condition) { + auto pg = svwhilelt_b8(i, size); + svint8_t conditionData = svld1_s8(pg, condition); + svbool_t mask = svcmpeq_n_s8(pg, conditionData, 0); + svuint8_t data = svsel_u8(mask, svdup_n_u8(3), svdup_n_u8(0)); + svst1_u8(pg, dst, data); +} + +template +void StreamingJoinOperator::setTimestamp_raw(int start, int size, const int64_t* src, int64_t* dst, int rowIndex) { + int processElement = svcntb(); + for (int i = 0; i < processElement && start + i < size; i++) { + dst[i + rowIndex] = src[i + start]; + } +} + template void StreamingJoinOperator::setOutPutMetaData(omnistream::VectorBatch *input, bool inputIsOuter, bool otherIsOuter, omnistream::VectorBatch *outputVB) diff --git a/cpp/table/runtime/operators/join/StreamingJoinOperator.h b/cpp/table/runtime/operators/join/StreamingJoinOperator.h index 13ff02fccd33d2dccb0d341f9f34c09d225b9255..69a5cbf5a8cd866d562a00fe2fc9d8875c7628fc 100644 --- a/cpp/table/runtime/operators/join/StreamingJoinOperator.h +++ b/cpp/table/runtime/operators/join/StreamingJoinOperator.h @@ -18,6 +18,7 @@ #include "table/data/util/RowDataUtil.h" #include "table/data/vectorbatch/VectorBatch.h" #include "OmniOperatorJIT/core/src/vector/large_string_container.h" +#include template class StreamingJoinOperator : public AbstractStreamingJoinOperator { @@ -183,6 +184,11 @@ private: void AssembleSecondTime(omnistream::VectorBatch* input, omnistream::VectorBatch* outputVB, JoinRecordStateView *otherSideStateView, bool inputIsLeft); + + void setRowKind_sve(int i, int size, uint8_t* dst, int8_t* condition); + + void setTimestamp_raw(int start, int size, const int64_t* src, int64_t* dst, int rowIndex); + void DealOneBatchInColumnVarchar(long id, int32_t icol, int& rowIndex, JoinRecordStateView *otherSideStateView, omniruntime::vec::Vector>*& outputCol); template diff --git a/cpp/table/runtime/operators/join/window/WindowJoinOperator.h b/cpp/table/runtime/operators/join/window/WindowJoinOperator.h index 4b9913dcf2dc201a89eacf79c1c7b3283618959a..cf109373d26c1e4cd587ace21225288a57fe011d 100644 --- a/cpp/table/runtime/operators/join/window/WindowJoinOperator.h +++ b/cpp/table/runtime/operators/join/window/WindowJoinOperator.h @@ -29,6 +29,8 @@ #include "OmniOperatorJIT/core/src/vector/unsafe_vector.h" #include "OmniOperatorJIT/core/src/operator/execution_context.h" +#include + using VectorBatchId = uint64_t; using namespace omnistream; @@ -508,21 +510,37 @@ template inline void WindowJoinOperator::insertLeft(int colIdx, std::vector *leftElements, std::vector *rightElements, omnistream::VectorBatch *outputBatch, bool isInner) { + int num = (*leftElements).size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, (*leftElements).data() + i); + svuint64_t comboID2 = svld1(pg2, (*leftElements).data() + i + half); + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } auto col = reinterpret_cast *>(outputBatch->Get(colIdx)); if (isNonEquiCondition || !isInner) { int rowIdx = 0; - for (auto element : *leftElements) { - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + for (int j = 0; j < num; j++) { + int batchId = batchIDdst[j]; + int rowId = rowIDdst[j]; col->SetValue( rowIdx, leftWindowState->getVectorBatch(batchId)->template GetValueAt(colIdx, rowId)); rowIdx++; } } else { int rowIdx = 0; - for (auto element : *leftElements) { - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + for (int j = 0; j < num; j++) { + int batchId = batchIDdst[j]; + int rowId = rowIDdst[j]; auto value = leftWindowState->getVectorBatch(batchId)->template GetValueAt(colIdx, rowId); for (size_t i = 0; i < rightElements->size(); i++) { col->SetValue(i + rowIdx, value); @@ -530,6 +548,8 @@ inline void WindowJoinOperator::insertLeft(int colIdx, std::vectorsize(); } } + delete batchIDdst; + delete rowIDdst; } template @@ -550,9 +570,28 @@ void WindowJoinOperator::insertLeftVarchar(int colIdx, std::vector( leftWindowState->getVectorBatch(batchId)->Get(colIdx))->GetValue(rowId); for (size_t i = 0; i < rightElements->size(); i++) { @@ -560,6 +599,8 @@ void WindowJoinOperator::insertLeftVarchar(int colIdx, std::vectorsize(); } + delete batchIDdst; + delete rowIDdst; } } @@ -568,12 +609,28 @@ template inline void WindowJoinOperator::insertRight(int colIdx, std::vector *leftElements, std::vector *rightElements, omnistream::VectorBatch *outputBatch, bool isInner) { + int num = (*rightElements).size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, (*rightElements).data() + i); + svuint64_t comboID2 = svld1(pg2, (*rightElements).data() + i + half); + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } auto col = reinterpret_cast *>(outputBatch->Get(colIdx)); if (isNonEquiCondition || !isInner) { int rowIdx = 0; - for (auto element : *rightElements) { - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + for (int i = 0; i < num; i++) { + int batchId = batchIDdst[i]; + int rowId = rowIDdst[i]; col->SetValue(rowIdx, rightWindowState->getVectorBatch(batchId)->template GetValueAt( colIdx - leftTypes.size(), rowId)); @@ -582,8 +639,8 @@ inline void WindowJoinOperator::insertRight(int colIdx, std::vectorsize(); i++) { auto element = rightElements->at(i); - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + int batchId = batchIDdst[i]; + int rowId = rowIDdst[i]; auto value = rightWindowState->getVectorBatch(batchId)->template GetValueAt( colIdx - leftTypes.size(), rowId); for (size_t j = 0; j < leftElements->size(); j++) { @@ -592,6 +649,8 @@ inline void WindowJoinOperator::insertRight(int colIdx, std::vector @@ -611,10 +670,26 @@ void WindowJoinOperator::insertRightVarchar(int colIdx, std::vectorsize(); i++) { auto element = rightElements->at(i); - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + int batchId = batchIDdst[i]; + int rowId = rowIDdst[i]; auto value = reinterpret_cast( rightWindowState->getVectorBatch(batchId)->Get(colIdx - leftTypes.size()))->GetValue(rowId); for (size_t j = 0; j < leftElements->size(); j++) { @@ -622,6 +697,8 @@ void WindowJoinOperator::insertRightVarchar(int colIdx, std::vectorSetValue(valIdx, value); } } + delete[] batchIDdst; + delete[] rowIDdst; } } template