diff --git a/huawei/mindspore/small_models/renset50_8p/benchmark.sh b/huawei/mindspore/small_models/renset50_8p/benchmark.sh new file mode 100644 index 0000000000000000000000000000000000000000..bf1afa464228b4551422095711eaee0fc7aa5ae0 --- /dev/null +++ b/huawei/mindspore/small_models/renset50_8p/benchmark.sh @@ -0,0 +1,81 @@ +#!/bin/bash +declare -i ret_ok=0 +declare -i ret_error=1 + +CUR_DIR=$(cd "$(dirname "$0")";pwd) + +RESNET_CODE_PATH="/path/to/ResNet" +export RANK_TABLE_FILE="/path/to/ckpt/save/hccl.json" +export TRAIN_DATASET_PATH="/path/to/dataset" +export EVAL_DATASET_PATH="/path/to/dataset" +export SAVE_CKPT_EPOCH=10 + + +TRAIN_PY_SCRIPT="${RESNET_CODE_PATH}/train.py" +EVAL_PY_SCRIPT="${RESNET_CODE_PATH}/eval.py" +LAUNCH_SH_SCRIPT="${RESNET_CODE_PATH}/scripts/run_distribute_train.sh" +EVAL_SH_SCRIPT="${RESNET_CODE_PATH}/scripts/run_eval.sh" +LAUNCH_CONFIG_PATH="${RESNET_CODE_PATH}/config/resnet50_imagenet2012_Boost_config.yaml" +MODIFIED_CKPT_PATH="${RESNET_CODE_PATH}/scripts/outputs/resnet50/imagenet2012/ckpt/resnet50_output.ckpt" + +export AIS_BENCH_LOGGING_DIR="${CUR_DIR}/../logging_result" + + +function replace_resnet_script() +{ + if [ -d ${AIS_BENCH_LOGGING_DIR} ];then + rm -rf ${AIS_BENCH_LOGGING_DIR} + fi + mkdir -p ${AIS_BENCH_LOGGING_DIR} + + if [ ! -d ${RESNET_CODE_PATH} ];then + echo "origin_code_path:${RESNET_CODE_PATH} not find!" + return $ret_error + fi + + cp "${CUR_DIR}/code_to_replace/train.py" ${TRAIN_PY_SCRIPT} || { echo "cp failed";return $ret_error; } + + cp "${CUR_DIR}/code_to_replace/eval.py" ${EVAL_PY_SCRIPT} || { echo "cp failed";return $ret_error; } + + cp "${CUR_DIR}/code_to_replace/run_distribute_train.sh" ${LAUNCH_SH_SCRIPT} || { echo "cp failed";return $ret_error; } + + cp "${CUR_DIR}/code_to_replace/run_eval.sh" ${EVAL_SH_SCRIPT} || { echo "cp failed";return $ret_error; } + + return $ret_ok +} + +function launch_run() +{ + rm -rf ${RESNET_CODE_PATH}/scripts/outputs + cd ${RESNET_CODE_PATH}/scripts + bash ${LAUNCH_SH_SCRIPT} ${RANK_TABLE_FILE} ${TRAIN_DATASET_PATH} ${LAUNCH_CONFIG_PATH} || { echo "run train failed!";return $ret_error; } + cd ${CUR_DIR} + return $ret_ok +} + +function launch_eval() +{ + cd ${RESNET_CODE_PATH}/scripts + mv ${RESNET_CODE_PATH}/scripts/outputs/resnet50/imagenet2012/ckpt/resnet50-*.ckpt ${MODIFIED_CKPT_PATH} + bash ${EVAL_SH_SCRIPT} ${EVAL_DATASET_PATH} \ + ${MODIFIED_CKPT_PATH} ${LAUNCH_CONFIG_PATH} || { echo "run eval failed!";return $ret_error; } + cd ${CUR_DIR} + return $ret_ok +} + +function collect_result() +{ + python3 -c "from ais_bench.logging import collect_report; collect_report('training', ['${AIS_BENCH_LOGGING_DIR}'])" +} + +function main() +{ + replace_resnet_script || { echo "replace_launch_script failed";return $ret_error; } + launch_run || { echo "launch_run failed!";return $ret_error; } + launch_eval || { echo "launch_eval failed!";return $ret_error; } + collect_result || { echo "collect train result failed!";return $ret_error; } + return $ret_ok +} + +main "$@" +exit $? \ No newline at end of file diff --git a/huawei/mindspore/small_models/renset50_8p/build.sh b/huawei/mindspore/small_models/renset50_8p/build.sh new file mode 100644 index 0000000000000000000000000000000000000000..4da3264613737dfe1052ff56367cd13c69d9c849 --- /dev/null +++ b/huawei/mindspore/small_models/renset50_8p/build.sh @@ -0,0 +1,64 @@ +#!/bin/bash +declare -i ret_ok=0 +declare -i ret_failed=1 + +CURRENT_DIR=$(cd "$(dirname "$0")";pwd) +OUTPUT_DIR=${CURRENT_DIR}/output +ARCH=$(uname -i) +STUBS_DIR_NAME=Ais-Benchmark-Stubs-${ARCH} +LOGGING_RESOURCE=https://gitee.com/aisbench/logging/releases/download/v2.1_alpha5/ais_bench_logging-2.1-py3-none-linux_${ARCH}.whl + +function clean_cached() { + if [ -d ${OUTPUT_DIR} ];then + rm -rf ${OUTPUT_DIR} + fi + mkdir -p ${OUTPUT_DIR} || { echo "mkdir output failed!";return ${ret_failed}; } + return ${ret_ok} +} + +function unpack_stubs_pkg() { + pkg_path=$1 + pkg_name=$(basename "${pkg_path}") + cp ${pkg_path} ${OUTPUT_DIR} || { echo "cp stubs pkg failed!"; return ${ret_failed}; } + cd ${OUTPUT_DIR} + tar -xzf ${pkg_name} || { echo "unpack stubs pkg failed!"; return ${ret_failed}; } + cd ${CURRENT_DIR} + return ${ret_ok} +} + +function add_logging_whl() { + cd ${OUTPUT_DIR}/${STUBS_DIR_NAME} + mkdir -p pkgs || { echo "mkdir pkgs dir failed!";return ${ret_failed}; } + cd ${OUTPUT_DIR}/${STUBS_DIR_NAME}/pkgs + wget -t 0 ${LOGGING_RESOURCE} --no-check-certificate || { echo "wget logging whl failed!";return ${ret_failed}; } + cd ${CURRENT_DIR} + return ${ret_ok} +} + +function code_replace() { + cd ${OUTPUT_DIR}/${STUBS_DIR_NAME}/code + rm -f ./* + cp ${CURRENT_DIR}/benchmark.sh ./ + cp -r ${CURRENT_DIR}/code_to_replace ./ + cd ${CURRENT_DIR} +} + +function pack_pkg() { + cd ${OUTPUT_DIR}/ + tar cvzf Ais-Benchmark-Stubs-${ARCH}-mindspore-resnet50-train.tar.gz Ais-Benchmark-Stubs-${ARCH} || { echo "pack pkg failed!";return ${ret_failed}; } + return ${ret_ok} + cd ${CURRENT_DIR} +} + +function main() { + stubs_pkg_path=$1 + clean_cached + unpack_stubs_pkg ${stubs_pkg_path} || { return ${ret_failed}; } + add_logging_whl || { return ${ret_failed}; } + code_replace + pack_pkg || { return ${ret_failed}; } + return ${ret_ok} +} + +main "$@" +exit $? diff --git a/huawei/mindspore/small_models/renset50_8p/code_to_replace/eval.py b/huawei/mindspore/small_models/renset50_8p/code_to_replace/eval.py new file mode 100644 index 0000000000000000000000000000000000000000..c62900276dfc69e35917e1b0135a7f7d32f377a6 --- /dev/null +++ b/huawei/mindspore/small_models/renset50_8p/code_to_replace/eval.py @@ -0,0 +1,117 @@ +# Copyright 2020-2022 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""eval resnet.""" +import os +import mindspore +from mindspore import Tensor +from mindspore.nn.optim import Momentum +from mindspore.nn.loss import SoftmaxCrossEntropyWithLogits +from src.CrossEntropySmooth import CrossEntropySmooth +from src.model_utils.config import config +from src.model_utils.moxing_adapter import moxing_wrapper + +import ais_bench.logging as aislog + +mindspore.set_seed(1) + +if config.net_name in ("resnet18", "resnet34", "resnet50", "resnet152"): + if config.net_name == "resnet18": + from src.resnet import resnet18 as resnet + elif config.net_name == "resnet34": + from src.resnet import resnet34 as resnet + elif config.net_name == "resnet50": + from src.resnet import resnet50 as resnet + else: + from src.resnet import resnet152 as resnet + if config.dataset == "cifar10": + from src.dataset import create_dataset1 as create_dataset + else: + from src.dataset import create_dataset2 as create_dataset + +elif config.net_name == "resnet101": + from src.resnet import resnet101 as resnet + from src.dataset import create_dataset3 as create_dataset +else: + from src.resnet import se_resnet50 as resnet + from src.dataset import create_dataset4 as create_dataset + + +def init_group_params(net): + decayed_params = [] + no_decayed_params = [] + for param in net.trainable_params(): + if 'beta' not in param.name and 'gamma' not in param.name and 'bias' not in param.name: + decayed_params.append(param) + else: + no_decayed_params.append(param) + + group_params = [{'params': decayed_params, 'weight_decay': config.weight_decay}, + {'params': no_decayed_params}, + {'order_params': net.trainable_params()}] + return group_params + + +@moxing_wrapper() +def eval_net(): + """eval net""" + target = config.device_target + # init context + mindspore.set_context(mode=0, device_target=target, save_graphs=False, jit_config={"jit_level": "O2"}) + if target == "Ascend": + device_id = int(os.getenv('DEVICE_ID', '0')) + mindspore.set_context(device_id=device_id) + + # create dataset + dataset = create_dataset(dataset_path=config.data_path, do_train=False, batch_size=config.batch_size, + eval_image_size=config.eval_image_size, + target=target) + + # define net + net = resnet(class_num=config.class_num) + + # load checkpoint + param_dict = mindspore.load_checkpoint(config.checkpoint_file_path) + mindspore.load_param_into_net(net, param_dict) + net.set_train(False) + + # define loss, model + if config.dataset == "imagenet2012": + if not config.use_label_smooth: + config.label_smooth_factor = 0.0 + loss = CrossEntropySmooth(sparse=True, reduction='mean', + smooth_factor=config.label_smooth_factor, + num_classes=config.class_num) + else: + loss = SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') + + #Currently, boost inference only supports scenarios with optimizers + #Optimizer waiting for decoupling in boost model + group_params = init_group_params(net) + opt = Momentum(group_params, Tensor(0.0), config.momentum, loss_scale=config.loss_scale) + + # define model, add boostmode for eval scenarios with train.py + model = mindspore.Model(net, loss_fn=loss, boost_level=config.boost_mode, + optimizer=opt, metrics={'top_1_accuracy', 'top_5_accuracy'}) + + # eval model + res = model.eval(dataset) + print("result:", res, "ckpt=", config.checkpoint_file_path) + aislog.init("training", os.getenv("AIS_BENCH_LOGGING_DIR")) + aislog.event("accuracy", f"top_1_accuracy: {res['top_1_accuracy']}") + aislog.event("result", "OK") + aislog.finish() + +if __name__ == '__main__': + eval_net() diff --git a/huawei/mindspore/small_models/renset50_8p/code_to_replace/run_distribute_train.sh b/huawei/mindspore/small_models/renset50_8p/code_to_replace/run_distribute_train.sh new file mode 100644 index 0000000000000000000000000000000000000000..c258a31c3446d3d1b15c70e5be73831cde157dbe --- /dev/null +++ b/huawei/mindspore/small_models/renset50_8p/code_to_replace/run_distribute_train.sh @@ -0,0 +1,169 @@ +#!/bin/bash +# Copyright 2020-2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +CURPATH="$(dirname "$0")" +# shellcheck source=/dev/null +. ${CURPATH}/cache_util.sh + +if [ $# != 3 ] && [ $# != 4 ] && [ $# != 5 ] && [ $# != 6 ] +then + echo "Usage: bash run_distribute_train.sh [RANK_TABLE_FILE] [DATASET_PATH] [CONFIG_PATH]" + echo "Usage: bash run_distribute_train.sh [RANK_TABLE_FILE] [DATASET_PATH] [CONFIG_PATH] [RESUME_CKPT](optional)" + echo "Usage: bash run_distribute_train.sh [RANK_TABLE_FILE] [DATASET_PATH] [CONFIG_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional)" + echo "Usage: bash run_distribute_train.sh [RANK_TABLE_FILE] [DATASET_PATH] [CONFIG_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional) [RESUME_CKPT](optional)" + exit 1 +fi + +get_real_path(){ + if [ "${1:0:1}" == "/" ]; then + echo "$1" + else + echo "$(realpath -m $PWD/$1)" + fi +} + +PATH1=$(get_real_path $1) +PATH2=$(get_real_path $2) +CONFIG_FILE=$(get_real_path $3) +str="Boost_" +if [[ $CONFIG_FILE =~ $str ]] +then + export MS_ENABLE_FORMAT_MODE=0 +fi + +if [ $# == 4 ] +then + RESUME_CKPT=$(get_real_path $4) +fi + +if [ $# == 5 ] +then + RUN_EVAL=$4 + EVAL_DATASET_PATH=$(get_real_path $5) +fi + +if [ $# == 6 ] +then + RUN_EVAL=$4 + EVAL_DATASET_PATH=$(get_real_path $5) + RESUME_CKPT=$(get_real_path $6) +fi + +if [ ! -f $PATH1 ] +then + echo "error: RANK_TABLE_FILE=$PATH1 is not a file" +exit 1 +fi + +if [ ! -d $PATH2 ] +then + echo "error: DATASET_PATH=$PATH2 is not a directory" +exit 1 +fi + +if [ $# == 4 ] && [ ! -f $RESUME_CKPT ] +then + echo "error: RESUME_CKPT=$RESUME_CKPT is not a file" +exit 1 +fi + +if [ "x${RUN_EVAL}" == "xTrue" ] && [ ! -d $EVAL_DATASET_PATH ] +then + echo "error: EVAL_DATASET_PATH=$EVAL_DATASET_PATH is not a directory" + exit 1 +fi + +if [ "x${RUN_EVAL}" == "xTrue" ] +then + bootup_cache_server + CACHE_SESSION_ID=$(generate_cache_session) +fi + +ulimit -u unlimited +# Number of single machine cards +export LOCAL_DEVICE_NUM=8 +# Total number of cards used in training +export RANK_SIZE=8 +export RANK_TABLE_FILE=$PATH1 +export MS_ASCEND_CHECK_OVERFLOW_MODE="SATURATION_MODE" + +export SERVER_ID=0 +rank_start=$((LOCAL_DEVICE_NUM * SERVER_ID)) + +cpus=`cat /proc/cpuinfo| grep "processor"| wc -l` +avg=`expr $cpus \/ $LOCAL_DEVICE_NUM` +gap=`expr $avg \- 1` + +for((i=0; i<${LOCAL_DEVICE_NUM}; i++)) +do + start=`expr $i \* $avg` + end=`expr $start \+ $gap` + cmdopt=$start"-"$end + export DEVICE_ID=${i} + export RANK_ID=$((rank_start + i)) + rm -rf ./train_parallel$i + mkdir ./train_parallel$i + cp ../*.py ./train_parallel$i + cp *.sh ./train_parallel$i + cp -r ../config/*.yaml ./train_parallel$i + cp -r ../src ./train_parallel$i + cd ./train_parallel$i || exit + echo "start training for rank $RANK_ID, device $DEVICE_ID" + env > env.log + if [ $# == 3 ] + then + if [ "${i}" == "0" ] + then + taskset -c $cmdopt python train.py --run_distribute=True --device_num=$RANK_SIZE --data_path=$PATH2 \ + --save_checkpoint=True --save_checkpoint_epochs=${SAVE_CKPT_EPOCH} \ + --config_path=$CONFIG_FILE --output_dir '../outputs' | tee -a log.txt & + else + taskset -c $cmdopt python train.py --run_distribute=True --device_num=$RANK_SIZE --data_path=$PATH2 \ + --save_checkpoint=True --save_checkpoint_epochs=${SAVE_CKPT_EPOCH} \ + --config_path=$CONFIG_FILE --output_dir '../outputs' &> log.txt & + fi + fi + + if [ $# == 4 ] + then + taskset -c $cmdopt python train.py --run_distribute=True --device_num=$RANK_SIZE --data_path=$PATH2 --resume_ckpt=$RESUME_CKPT \ + --config_path=$CONFIG_FILE --output_dir '../outputs' &> log.txt & + fi + + if [ $# == 5 ] + then + taskset -c $cmdopt python train.py --run_distribute=True --device_num=$RANK_SIZE --data_path=$PATH2 \ + --run_eval=$RUN_EVAL --eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True \ + --cache_session_id=$CACHE_SESSION_ID --config_path=$CONFIG_FILE --output_dir '../outputs' &> log.txt & + if [ "x${RUN_EVAL}" == "xTrue" ] + then + echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\"" + fi + fi + + if [ $# == 6 ] + then + taskset -c $cmdopt python train.py --run_distribute=True --device_num=$RANK_SIZE --data_path=$PATH2 \ + --run_eval=$RUN_EVAL --eval_dataset_path=$EVAL_DATASET_PATH --enable_cache=True --resume_ckpt=$RESUME_CKPT \ + --cache_session_id=$CACHE_SESSION_ID --config_path=$CONFIG_FILE --output_dir '../outputs' &> log.txt & + if [ "x${RUN_EVAL}" == "xTrue" ] + then + echo -e "\nWhen training run is done, remember to shut down the cache server via \"cache_admin --stop\"" + fi + fi + cd .. +done +wait \ No newline at end of file diff --git a/huawei/mindspore/small_models/renset50_8p/code_to_replace/run_eval.sh b/huawei/mindspore/small_models/renset50_8p/code_to_replace/run_eval.sh new file mode 100644 index 0000000000000000000000000000000000000000..3921c1d6a6b726a82e68a79a31d8e38be802fb5a --- /dev/null +++ b/huawei/mindspore/small_models/renset50_8p/code_to_replace/run_eval.sh @@ -0,0 +1,72 @@ +#!/bin/bash +# Copyright 2020-2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +if [ $# != 3 ] && [ $# != 4 ] +then + echo "Usage: bash run_eval.sh [DATASET_PATH] [CHECKPOINT_PATH] [CONFIG_PATH] [DEVICE_ID](optional)" +exit 1 +fi + +get_real_path(){ + if [ "${1:0:1}" == "/" ]; then + echo "$1" + else + echo "$(realpath -m $PWD/$1)" + fi +} + +PATH1=$(get_real_path $1) +PATH2=$(get_real_path $2) +CONFIG_FILE=$(get_real_path $3) + + +if [ ! -d $PATH1 ] +then + echo "error: DATASET_PATH=$PATH1 is not a directory" +exit 1 +fi + +if [ ! -f $PATH2 ] +then + echo "error: CHECKPOINT_PATH=$PATH2 is not a file" +exit 1 +fi + +if [ $# -eq 4 ]; then + DEVICE_ID=$4 +else + DEVICE_ID=0 +fi + +export LOCAL_DEVICE_NUM=1 +export RANK_SIZE=1 +export RANK_ID=0 +export DEVICE_ID=$DEVICE_ID + +if [ -d "eval" ]; +then + rm -rf ./eval +fi +mkdir ./eval +cp ../*.py ./eval +cp *.sh ./eval +cp -r ../config/*.yaml ./eval +cp -r ../src ./eval +cd ./eval || exit +env > env.log +echo "start evaluation" +python eval.py --data_path=$PATH1 --checkpoint_file_path=$PATH2 --config_path=$CONFIG_FILE | tee -a log.txt +cd .. diff --git a/huawei/mindspore/small_models/renset50_8p/code_to_replace/train.py b/huawei/mindspore/small_models/renset50_8p/code_to_replace/train.py new file mode 100644 index 0000000000000000000000000000000000000000..550348ba9c2e2d40a8a1bd83b5343d9817f8fc7e --- /dev/null +++ b/huawei/mindspore/small_models/renset50_8p/code_to_replace/train.py @@ -0,0 +1,252 @@ +# Copyright 2020-2022 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""train resnet.""" +import os + +import mindspore +import mindspore.nn as nn +import mindspore.log as logger +from mindspore.train.train_thor import ConvertModelUtils +from mindspore.train.callback import ModelCheckpoint, CheckpointConfig +from mindspore.communication.management import init, get_rank +from mindspore.parallel import set_algo_parameters + +from src.logger import get_logger +from src.lr_generator import get_lr, warmup_cosine_annealing_lr +from src.CrossEntropySmooth import CrossEntropySmooth +from src.callback import LossCallBack, ResumeCallback +from src.util import eval_callback, init_weight, init_group_params, set_output_dir +from src.metric import DistAccuracy, ClassifyCorrectCell +from src.model_utils.config import config +from src.model_utils.moxing_adapter import moxing_wrapper +from src.model_utils.device_adapter import get_device_num + +import ais_bench.logging as aislog + +mindspore.set_seed(1) + +if config.net_name in ("resnet18", "resnet34", "resnet50", "resnet152"): + if config.net_name == "resnet18": + from src.resnet import resnet18 as resnet + elif config.net_name == "resnet34": + from src.resnet import resnet34 as resnet + elif config.net_name == "resnet50": + from src.resnet import resnet50 as resnet + else: + from src.resnet import resnet152 as resnet + if config.dataset == "cifar10": + from src.dataset import create_dataset1 as create_dataset + else: + from src.dataset import create_dataset2 as create_dataset +elif config.net_name == "resnet101": + from src.resnet import resnet101 as resnet + from src.dataset import create_dataset3 as create_dataset +else: + from src.resnet import se_resnet50 as resnet + from src.dataset import create_dataset4 as create_dataset + + +def set_graph_kernel_context(run_platform, net_name): + if run_platform == "GPU" and net_name == "resnet101": + mindspore.set_context(enable_graph_kernel=True) + mindspore.set_context(graph_kernel_flags="--enable_parallel_fusion --enable_expand_ops=Conv2D") + + +def set_parameter(): + """set_parameter""" + target = config.device_target + if target == "CPU": + config.run_distribute = False + + # init context + if config.mode_name == 'GRAPH': + if target == "Ascend": + rank_save_graphs_path = os.path.join(config.save_graphs_path, "soma", str(os.getenv('DEVICE_ID', '0'))) + mindspore.set_context(mode=0, device_target=target, save_graphs=config.save_graphs, + save_graphs_path=rank_save_graphs_path, jit_config={"jit_level": "O2"}) + else: + mindspore.set_context(mode=0, device_target=target, save_graphs=config.save_graphs, + jit_config={"jit_level": "O2"}) + set_graph_kernel_context(target, config.net_name) + else: + mindspore.set_context(mode=1, device_target=target, save_graphs=False) + set_ascend_max_device_memory() + if config.parameter_server: + mindspore.set_ps_context(enable_ps=True) + if config.run_distribute: + if target == "Ascend": + device_id = int(os.getenv('DEVICE_ID', '0')) + mindspore.set_context(device_id=device_id) + mindspore.set_auto_parallel_context(device_num=config.device_num, gradients_mean=True, + parallel_mode=mindspore.ParallelMode.DATA_PARALLEL) + set_algo_parameters(elementwise_op_strategy_follow=True) + if config.net_name == "resnet50" or config.net_name == "se-resnet50": + if config.boost_mode not in ["O1", "O2"]: + mindspore.set_auto_parallel_context(all_reduce_fusion_config=config.all_reduce_fusion_config) + elif config.net_name in ["resnet101", "resnet152"]: + mindspore.set_auto_parallel_context(all_reduce_fusion_config=config.all_reduce_fusion_config) + init() + # GPU target + else: + init() + mindspore.set_auto_parallel_context(device_num=get_device_num(), gradients_mean=True, + parallel_mode=mindspore.ParallelMode.DATA_PARALLEL) + if config.net_name == "resnet50": + mindspore.set_auto_parallel_context(all_reduce_fusion_config=config.all_reduce_fusion_config) + config.rank_id = get_rank() if config.run_distribute else 0 + + +def init_lr(step_size): + """init lr""" + if config.optimizer == "Thor": + from src.lr_generator import get_thor_lr + lr = get_thor_lr(config.start_epoch * step_size, config.lr_init, config.lr_decay, config.lr_end_epoch, + step_size, decay_epochs=39) + else: + if config.net_name in ("resnet18", "resnet34", "resnet50", "resnet152", "se-resnet50"): + config.lr_max = config.lr_max / 8 * config.device_num + lr = get_lr(lr_init=config.lr_init, lr_end=config.lr_end, lr_max=config.lr_max, + warmup_epochs=config.warmup_epochs, total_epochs=config.epoch_size, + start_epoch=config.start_epoch, steps_per_epoch=step_size, lr_decay_mode=config.lr_decay_mode) + else: + lr = warmup_cosine_annealing_lr(config.lr, step_size, config.warmup_epochs, config.epoch_size, + config.start_epoch * step_size) + return lr + + +def init_loss_scale(): + if config.dataset == "imagenet2012": + if not config.use_label_smooth: + config.label_smooth_factor = 0.0 + loss = CrossEntropySmooth(sparse=True, reduction="mean", + smooth_factor=config.label_smooth_factor, num_classes=config.class_num) + else: + loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction='mean') + return loss + + +def set_ascend_max_device_memory(): + if mindspore.get_context("enable_ge") and mindspore.get_context("mode") == 0 and \ + hasattr(config, "max_device_memory"): + logger.warning("When encountering a memory shortage situation in 1980B, reduce the max_device_memory.") + mindspore.set_context(max_device_memory=config.max_device_memory) + + +@moxing_wrapper() +def train_net(): + """train net""" + target = config.device_target + set_parameter() + set_output_dir(config) + config.logger = get_logger(config.log_dir, config.rank_id, config.parameter_server) + dataset = create_dataset(dataset_path=config.data_path, do_train=True, + batch_size=config.batch_size, train_image_size=config.train_image_size, + eval_image_size=config.eval_image_size, target=target, + distribute=config.run_distribute) + step_size = dataset.get_dataset_size() + net = resnet(class_num=config.class_num) + if config.parameter_server: + net.set_param_ps() + init_weight(net, config) + + if config.resume_ckpt: + resume_param = mindspore.load_checkpoint(config.resume_ckpt, choice_func=\ + lambda x: not x.startswith(('learning_rate', 'global_step'))) + config.start_epoch = int(resume_param.get('epoch_num', mindspore.Tensor(0, mindspore.int32)).asnumpy().item()) + + lr = mindspore.Tensor(init_lr(step_size=step_size)) + # define opt + group_params = init_group_params(net, config) + opt = nn.Momentum(group_params, lr, config.momentum, loss_scale=config.loss_scale) + if config.optimizer == "LARS": + opt = nn.LARS(opt, epsilon=config.lars_epsilon, coefficient=config.lars_coefficient, + lars_filter=lambda x: 'beta' not in x.name and 'gamma' not in x.name and 'bias' not in x.name) + loss = init_loss_scale() + loss_scale = mindspore.FixedLossScaleManager(config.loss_scale, drop_overflow_update=False) + dist_eval_network = ClassifyCorrectCell(net) if config.run_distribute else None + metrics = {"acc"} + if config.run_distribute: + metrics = {'acc': DistAccuracy(batch_size=config.batch_size, device_num=config.device_num)} + if (config.net_name not in ("resnet18", "resnet34", "resnet50", "resnet101", "resnet152", "se-resnet50")) or \ + config.parameter_server or target == "CPU": + # fp32 training + model = mindspore.Model(net, loss_fn=loss, optimizer=opt, metrics=metrics, eval_network=dist_eval_network) + else: + model = mindspore.Model(net, loss_fn=loss, optimizer=opt, loss_scale_manager=loss_scale, metrics=metrics, + amp_level="O3", boost_level=config.boost_mode, eval_network=dist_eval_network, + boost_config_dict={"grad_freeze": {"total_steps": config.epoch_size * step_size}}) + + if config.optimizer == "Thor" and config.dataset == "imagenet2012": + from src.lr_generator import get_thor_damping + damping = get_thor_damping(step_size * config.start_epoch, config.damping_init, config.damping_decay, 70, + step_size) + split_indices = [26, 53] + opt = nn.thor(net, lr, mindspore.Tensor(damping), config.momentum, config.weight_decay, config.loss_scale, + config.batch_size, split_indices=split_indices, frequency=config.frequency) + model = ConvertModelUtils().convert_to_thor_model(model=model, network=net, loss_fn=loss, optimizer=opt, + loss_scale_manager=loss_scale, metrics={'acc'}, + amp_level="O3") + config.run_eval = False + config.logger.warning("Thor optimizer not support evaluation while training.") + + # load resume param + if config.resume_ckpt: + mindspore.load_param_into_net(net, resume_param) + mindspore.load_param_into_net(opt, resume_param) + config.logger.info('resume train from epoch: %s', config.start_epoch) + + # define callbacks + loss_cb = LossCallBack(config.epoch_size, config.logger, lr, per_print_time=10) + resume_cb = ResumeCallback(config.start_epoch) + cb = [loss_cb, resume_cb] + if config.save_checkpoint and config.rank_id == 0: + ckpt_append_info = [{"epoch_num": 0, "step_num": 0}] + config_ck = CheckpointConfig(save_checkpoint_steps=config.save_checkpoint_epochs * step_size, + keep_checkpoint_max=config.keep_checkpoint_max, + append_info=ckpt_append_info) + ckpt_cb = ModelCheckpoint(prefix=config.net_name, directory=config.save_ckpt_dir, config=config_ck) + cb += [ckpt_cb] + + if config.run_eval: + eval_dataset = create_dataset(dataset_path=config.eval_dataset_path, do_train=False, + batch_size=config.batch_size, train_image_size=config.train_image_size, + eval_image_size=config.eval_image_size, + target=target, enable_cache=config.enable_cache, + cache_session_id=config.cache_session_id) + eval_cb = eval_callback(model, config, eval_dataset) + cb.append(eval_cb) + + aislog.init("training", os.getenv("AIS_BENCH_LOGGING_DIR")) + all_data_sum = step_size * config.batch_size * config.epoch_size + + model.build(dataset, None, sink_size=dataset.get_dataset_size(), epoch=config.epoch_size) + # train model + if config.net_name == "se-resnet50": + config.epoch_size = config.train_epoch_size + dataset_sink_mode = (not config.parameter_server) and target != "CPU" + config.logger.save_args(config) + + aislog.start("train", all_data_sum) + model.train(config.epoch_size - config.start_epoch, dataset, callbacks=cb, + sink_size=dataset.get_dataset_size(), dataset_sink_mode=dataset_sink_mode) + aislog.end("train", all_data_sum) + aislog.event("result", "OK") + aislog.finish() + + config.logger.info("If run eval and enable_cache Remember to shut down the cache server via \"cache_admin --stop\"") + + +if __name__ == '__main__': + train_net() diff --git a/huawei/pytorch/mindspeed-llm/llama2-13B-8p/benchmark.sh b/huawei/pytorch/mindspeed-llm/llama2-13B-8p/benchmark.sh new file mode 100644 index 0000000000000000000000000000000000000000..2e6625697c3b023e0ddfb53e5be8dd8ee88d905b --- /dev/null +++ b/huawei/pytorch/mindspeed-llm/llama2-13B-8p/benchmark.sh @@ -0,0 +1,61 @@ +#!/bin/bash +declare -i ret_ok=0 +declare -i ret_error=1 + +CUR_DIR=$(cd "$(dirname "$0")";pwd) + +MINDSPEED_LLM_CODE_PATH="/path/to/MindSpeed-LLM" +export AIS_BENCH_LOGGING_DIR="${CUR_DIR}/../logging_result" + +LAUNCH_SCRIPT_PATH="${MINDSPEED_LLM_CODE_PATH}/examples/legacy/llama2/pretrain_llama2_13B_ptd_8p.sh" +TRAIN_CODE_PATH="${MINDSPEED_LLM_CODE_PATH}/mindspeed_llm/training/training.py" + + +export AIS_BENCH_CKPT_SAVE_DIR="/path/to/ckpt/save/dir" +export AIS_BENCH_DATA_PATH="/path/to/dataset" +export AIS_BENCH_TOKENIZER_MODEL="/path/to/tokenizer" +export AIS_BENCH_CKPT_LOAD_DIR="/path/to/ckpt/load/dir" +export AIS_BENCH_TRAIN_ITERS=10 + +function replace_mindspeed_llm_script() +{ + if [ -d ${AIS_BENCH_LOGGING_DIR} ];then + rm -rf ${AIS_BENCH_LOGGING_DIR} + fi + mkdir -p ${AIS_BENCH_LOGGING_DIR} + + if [ ! -d ${MINDSPEED_LLM_CODE_PATH} ];then + echo "origin_code_path:${MINDSPEED_LLM_CODE_PATH} not find!" + return $ret_error + fi + + cp "${CUR_DIR}/code_to_replace/pretrain_llama2_13B_ptd_8p.sh" ${LAUNCH_SCRIPT_PATH} || { echo "cp failed";return $ret_error; } + + cp "${CUR_DIR}/code_to_replace/training.py" ${TRAIN_CODE_PATH} || { echo "cp failed";return $ret_error; } + + return $ret_ok +} + +function launch_run() +{ + cd ${MINDSPEED_LLM_CODE_PATH} + bash ${LAUNCH_SCRIPT_PATH} || { echo "run train failed!";return $ret_error; } + cd ${CUR_DIR} + return $ret_ok +} + +function collect_result() +{ + python3 -c "from ais_bench.logging import collect_report; collect_report('training', ['${AIS_BENCH_LOGGING_DIR}'])" +} + +function main() +{ + replace_mindspeed_llm_script || { echo "replace_launch_script failed";return $ret_error; } + launch_run || { echo "launch_run failed!";return $ret_error; } + collect_result || { echo "collect train result failed!";return $ret_error; } + return $ret_ok +} + +main "$@" +exit $? \ No newline at end of file diff --git a/huawei/pytorch/mindspeed-llm/llama2-13B-8p/build.sh b/huawei/pytorch/mindspeed-llm/llama2-13B-8p/build.sh new file mode 100644 index 0000000000000000000000000000000000000000..f4554f0bc38bb28f13c3176e530016e83575b9f8 --- /dev/null +++ b/huawei/pytorch/mindspeed-llm/llama2-13B-8p/build.sh @@ -0,0 +1,64 @@ +#!/bin/bash +declare -i ret_ok=0 +declare -i ret_failed=1 + +CURRENT_DIR=$(cd "$(dirname "$0")";pwd) +OUTPUT_DIR=${CURRENT_DIR}/output +ARCH=$(uname -i) +STUBS_DIR_NAME=Ais-Benchmark-Stubs-${ARCH} +LOGGING_RESOURCE=https://gitee.com/aisbench/logging/releases/download/v2.1_alpha5/ais_bench_logging-2.1-py3-none-linux_${ARCH}.whl + +function clean_cached() { + if [ -d ${OUTPUT_DIR} ];then + rm -rf ${OUTPUT_DIR} + fi + mkdir -p ${OUTPUT_DIR} || { echo "mkdir output failed!";return ${ret_failed}; } + return ${ret_ok} +} + +function unpack_stubs_pkg() { + pkg_path=$1 + pkg_name=$(basename "${pkg_path}") + cp ${pkg_path} ${OUTPUT_DIR} || { echo "cp stubs pkg failed!"; return ${ret_failed}; } + cd ${OUTPUT_DIR} + tar -xzf ${pkg_name} || { echo "unpack stubs pkg failed!"; return ${ret_failed}; } + cd ${CURRENT_DIR} + return ${ret_ok} +} + +function add_logging_whl() { + cd ${OUTPUT_DIR}/${STUBS_DIR_NAME} + mkdir -p pkgs || { echo "mkdir pkgs dir failed!";return ${ret_failed}; } + cd ${OUTPUT_DIR}/${STUBS_DIR_NAME}/pkgs + wget -t 0 ${LOGGING_RESOURCE} --no-check-certificate || { echo "wget logging whl failed!";return ${ret_failed}; } + cd ${CURRENT_DIR} + return ${ret_ok} +} + +function code_replace() { + cd ${OUTPUT_DIR}/${STUBS_DIR_NAME}/code + rm -f ./* + cp ${CURRENT_DIR}/benchmark.sh ./ + cp -r ${CURRENT_DIR}/code_to_replace ./ + cd ${CURRENT_DIR} +} + +function pack_pkg() { + cd ${OUTPUT_DIR}/ + tar cvzf Ais-Benchmark-Stubs-${ARCH}-mindspeed-llama2-13b-train.tar.gz Ais-Benchmark-Stubs-${ARCH} || { echo "pack pkg failed!";return ${ret_failed}; } + return ${ret_ok} + cd ${CURRENT_DIR} +} + +function main() { + stubs_pkg_path=$1 + clean_cached + unpack_stubs_pkg ${stubs_pkg_path} || { return ${ret_failed}; } + add_logging_whl || { return ${ret_failed}; } + code_replace + pack_pkg || { return ${ret_failed}; } + return ${ret_ok} +} + +main "$@" +exit $? diff --git a/huawei/pytorch/mindspeed-llm/llama2-13B-8p/code_to_replace/pretrain_llama2_13B_ptd_8p.sh b/huawei/pytorch/mindspeed-llm/llama2-13B-8p/code_to_replace/pretrain_llama2_13B_ptd_8p.sh new file mode 100644 index 0000000000000000000000000000000000000000..16266ff24d2a75db09490c1e76da1ce711c2a296 --- /dev/null +++ b/huawei/pytorch/mindspeed-llm/llama2-13B-8p/code_to_replace/pretrain_llama2_13B_ptd_8p.sh @@ -0,0 +1,96 @@ +#!/bin/bash +export CUDA_DEVICE_MAX_CONNECTIONS=1 +export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True + +GPUS_PER_NODE=8 +MASTER_ADDR=localhost +MASTER_PORT=6000 +NNODES=1 +NODE_RANK=0 +WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES)) + +CKPT_SAVE_DIR="${AIS_BENCH_CKPT_SAVE_DIR}" +DATA_PATH="${AIS_BENCH_DATA_PATH}" +TOKENIZER_MODEL="${AIS_BENCH_TOKENIZER_MODEL}" +CKPT_LOAD_DIR="${AIS_BENCH_CKPT_LOAD_DIR}" +TP=8 +PP=1 + +DISTRIBUTED_ARGS=" + --nproc_per_node $GPUS_PER_NODE \ + --nnodes $NNODES \ + --node_rank $NODE_RANK \ + --master_addr $MASTER_ADDR \ + --master_port $MASTER_PORT +" + +GPT_ARGS=" + --tensor-model-parallel-size ${TP} \ + --pipeline-model-parallel-size ${PP} \ + --sequence-parallel \ + --num-layers 40 \ + --hidden-size 5120 \ + --ffn-hidden-size 13824 \ + --num-attention-heads 40 \ + --tokenizer-type Llama2Tokenizer \ + --tokenizer-model ${TOKENIZER_MODEL} \ + --seq-length 4096 \ + --max-position-embeddings 4096 \ + --micro-batch-size 4 \ + --global-batch-size 512 \ + --make-vocab-size-divisible-by 1 \ + --lr 1e-6 \ + --train-iters ${AIS_BENCH_TRAIN_ITERS} \ + --lr-decay-style cosine \ + --untie-embeddings-and-output-weights \ + --disable-bias-linear \ + --attention-dropout 0.0 \ + --init-method-std 0.01 \ + --hidden-dropout 0.0 \ + --position-embedding-type rope \ + --normalization RMSNorm \ + --use-fused-rmsnorm \ + --swiglu \ + --use-flash-attn \ + --no-masked-softmax-fusion \ + --attention-softmax-in-fp32 \ + --min-lr 1e-8 \ + --weight-decay 1e-1 \ + --lr-warmup-fraction 0.01 \ + --clip-grad 1.0 \ + --adam-beta1 0.9 \ + --initial-loss-scale 4096 \ + --adam-beta2 0.95 \ + --no-gradient-accumulation-fusion \ + --no-load-optim \ + --no-load-rng \ + --use-fused-swiglu \ + --use-fused-rotary-pos-emb \ + --use-mc2 \ + --bf16 +" + +if [ "${CKPT_LOAD_DIR}" != "" ];then + GPT_ARGS="${GPT_ARGS} --load ${CKPT_LOAD_DIR}" +fi + +DATA_ARGS=" + --data-path $DATA_PATH \ + --split 949,50,1 +" + +OUTPUT_ARGS=" + --log-interval 1 \ + --save-interval 10000 \ + --eval-interval 1000 \ + --eval-iters 10 \ +" + +python -m torch.distributed.launch $DISTRIBUTED_ARGS pretrain_gpt.py \ + $GPT_ARGS \ + $DATA_ARGS \ + $OUTPUT_ARGS \ + --distributed-backend nccl \ + --jit-compile \ + --save $CKPT_SAVE_DIR \ + | tee logs/train_llama2_13b.log \ No newline at end of file diff --git a/huawei/pytorch/mindspeed-llm/llama2-13B-8p/code_to_replace/training.py b/huawei/pytorch/mindspeed-llm/llama2-13B-8p/code_to_replace/training.py new file mode 100644 index 0000000000000000000000000000000000000000..69818d61359ad675b147c92165ad40cc2a6a2010 --- /dev/null +++ b/huawei/pytorch/mindspeed-llm/llama2-13B-8p/code_to_replace/training.py @@ -0,0 +1,670 @@ +# coding=utf-8 +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024, HUAWEI CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import gc +import sys +import json +from functools import wraps + +import time + +# The earliest we can measure the start time. +_TRAIN_START_TIME = time.time() + +import torch +import torch_npu + +from megatron.training import get_args +from megatron.training import get_timers +from megatron.training import update_num_microbatches +from megatron.training import get_signal_handler +from megatron.training import get_tensorboard_writer +from megatron.training import get_wandb_writer +from megatron.core import mpu +from megatron.core.utils import get_model_config +from megatron.core.enums import ModelType +from megatron.training.checkpointing import save_checkpoint +from megatron.training.initialize import initialize_megatron +from megatron.training.initialize import write_args_to_tensorboard +from megatron.training.arguments import core_transformer_config_from_args +from megatron.training.training import ( + train_step, get_num_microbatches, calc_params_l2_norm, + training_log, evaluate_and_print_results, + save_checkpoint_and_time, print_datetime, + num_floating_point_operations, get_one_logger, + append_to_progress_log, build_train_valid_test_data_iterators +) +import megatron.training.utils +from megatron.training.utils import ( + check_adlr_autoresume_termination, + print_rank_0 +) +from megatron.core.distributed import DistributedDataParallel as DDP +from megatron.core.distributed import finalize_model_grads +from mindspeed_llm.training.initialize import set_jit_fusion_options +from mindspeed_llm.tasks.finetune.lora.utils import is_enable_lora +import ais_bench.logging as aislog + + +def model_provider_func_wrapper(model_provider_func): + @wraps(model_provider_func) + def wrapper(*args, **kwargs): + model = model_provider_func(*args, **kwargs) + args = get_args() + if args.use_fused_mlp: + from mindspeed_llm.tasks.models.transformer.fast_mlp import ParallelSwigluMLPForward + from megatron.legacy.model.transformer import ParallelMLP + from megatron.core.transformer.mlp import MLP + ParallelMLP.forward = ParallelSwigluMLPForward + MLP.forward = ParallelSwigluMLPForward + + if is_enable_lora(): + import peft + from packaging import version + from peft import LoraConfig, get_peft_model, PeftModel, LoraModel + if version.parse(peft.__version__) <= version.parse('0.11.1'): + setattr(peft.tuners.lora.LoraLayer, 'merge', peft.tuners.lora.Linear.merge) + setattr(peft.tuners.lora.LoraLayer, 'unmerge', peft.tuners.lora.Linear.unmerge) + setattr(peft.tuners.lora.LoraLayer, 'get_delta_weight', peft.tuners.lora.Linear.get_delta_weight) + from peft.tuners.lora import tp_layer + from mindspeed_llm.tasks.finetune.lora.lora_moe import LoraParallelLinearMoE + tp_layer.LoraParallelLinear = LoraParallelLinearMoE + + if hasattr(args, 'lora_fusion') and args.lora_fusion: + from peft.tuners.lora.tp_layer import LoraParallelLinear + from mindspeed_llm.tasks.finetune.lora.cc_lora_forward import CCLoraParallelLinearForward + LoraParallelLinear.forward = CCLoraParallelLinearForward + + config = core_transformer_config_from_args(args) + lora_config = LoraConfig( + r=args.lora_r, + lora_alpha=args.lora_alpha, + target_modules=args.lora_target_modules, + lora_dropout=0.0, + bias="none", + megatron_config=config, + megatron_core="megatron.core", + ) + + model = get_peft_model(model, lora_config) + model.add_module('module', model.get_base_model()) + + def _hook(_module, _x_in, _x_out): + """ Extract the feature map of model""" + _x_out.requires_grad_(True) + + def _create_hooks(_model, layer): + """ Make the hooks function""" + for name, module in _model.named_modules(): + if isinstance(module, megatron.core.tensor_parallel.layers.VocabParallelEmbedding): + _name = name.split('.')[-1] + if _name in layer: + module.register_forward_hook(_hook) + + if args.recompute_method == 'block' and args.recompute_granularity == 'full': + _create_hooks(model, args.lora_register_forward_hook) + + model.print_trainable_parameters() + for module in model.modules(): + # LoRA Linear Layer need all reduce + if isinstance(module, torch.nn.Linear): + setattr(module.weight, 'sequence_parallel', config.sequence_parallel) + # Other layers if is frozen, do not need all reduce. + for param in module.parameters(): + if not param.requires_grad and hasattr(param, 'sequence_parallel'): + delattr(param, 'sequence_parallel') + + megatron.training.utils.ALL_MODULE_WRAPPER_CLASSNAMES = tuple( + list(megatron.training.utils.ALL_MODULE_WRAPPER_CLASSNAMES) + [PeftModel, LoraModel] + ) + + return model + + return wrapper + + +def get_model_wrapper(fn): + @wraps(fn) + def wrapper(model_provider_func, model_type=ModelType.encoder_or_decoder, wrap_with_ddp=True): + model_provider_func = model_provider_func_wrapper(model_provider_func) + model = fn(model_provider_func, model_type, wrap_with_ddp) + return model + + return wrapper + + +def is_profile_enabled(): + args = get_args() + if not args.profile: + return False + if args.profile_ranks == [-1]: + return True + if torch.distributed.get_rank() in args.profile_ranks: + return True + return False + + +def get_profiler(): + args = get_args() + + if args.profile_level == 'level0': + profiler_level = torch_npu.profiler.ProfilerLevel.Level0 + elif args.profile_level == 'level1': + profiler_level = torch_npu.profiler.ProfilerLevel.Level1 + elif args.profile_level == 'level2': + profiler_level = torch_npu.profiler.ProfilerLevel.Level2 + else: + raise ValueError(f"profiler_level only supports level0," + f" 1, and 2, but gets {args.profile_level}") + + experimental_config = torch_npu.profiler._ExperimentalConfig( + aic_metrics=torch_npu.profiler.AiCMetrics.PipeUtilization, + profiler_level=profiler_level, + ) + skip_first = args.profile_step_start + active = args.profile_step_end - args.profile_step_start + + activites = [torch_npu.profiler.ProfilerActivity.NPU] + if args.profile_with_cpu: + activites.append(torch_npu.profiler.ProfilerActivity.CPU) + + prof = torch_npu.profiler.profile( + with_stack=args.profile_with_stack, + record_shapes=args.profile_record_shapes, + profile_memory=args.profile_with_memory, + activities=activites, + schedule=torch_npu.profiler.schedule(wait=0, warmup=1, active=active, repeat=1, skip_first=skip_first), + on_trace_ready=torch_npu.profiler.tensorboard_trace_handler(args.profile_save_path), + experimental_config=experimental_config) + + prof.add_metadata_json('distributed_args', json.dumps({ + 'tensor_model_parallel_size': args.tensor_model_parallel_size, + 'pipeline_model_parallel_size': args.pipeline_model_parallel_size, + 'data_parallel_size': args.data_parallel_size, + 'context_parallel_size': args.context_parallel_size, + 'expert_model_parallel_size': args.expert_model_parallel_size, + 'sequence_parallel': args.sequence_parallel, + 'rank': args.rank, + 'world_size': args.world_size + })) + return prof + + +def setup_model_and_optimizer_wrapper(fn): + @wraps(fn) + def wrapper(*args, **kwargs): + model, optimizer, opt_param_scheduler = fn(*args, **kwargs) + argument = get_args() + if argument.enable_high_availability and hasattr(optimizer, "set_current_step"): + optimizer.set_current_step(argument.iteration) + return model, optimizer, opt_param_scheduler + + return wrapper + + +def build_train_args(*input_args): + args, timers, train_valid_test_dataset_provider, model_provider, model_type, forward_step_func, process_non_loss_data_func = input_args + + from megatron.training.training import setup_model_and_optimizer + # Model, optimizer, and learning rate. + timers('model-and-optimizer-setup', log_level=0).start(barrier=True) + model, optimizer, opt_param_scheduler = setup_model_and_optimizer( + model_provider, model_type) + + timers('model-and-optimizer-setup').stop() + print_datetime('after model, optimizer, and learning rate ' + 'scheduler are built') + config = get_model_config(model[0]) + + # Data stuff. + timers('train/valid/test-data-iterators-setup', log_level=0).start( + barrier=True) + if args.virtual_pipeline_model_parallel_size is not None: + train_data_iterator = [] + valid_data_iterator = [] + test_data_iterator = [] + for i in range(len(model)): + mpu.set_virtual_pipeline_model_parallel_rank(i) + iterators = build_train_valid_test_data_iterators( + train_valid_test_dataset_provider) + train_data_iterator.append(iterators[0]) + valid_data_iterator.append(iterators[1]) + test_data_iterator.append(iterators[2]) + else: + train_data_iterator, valid_data_iterator, test_data_iterator \ + = build_train_valid_test_data_iterators( + train_valid_test_dataset_provider) + timers('train/valid/test-data-iterators-setup').stop() + print_datetime('after dataloaders are built') + + # Print setup timing. + print_rank_0('done with setup ...') + timers.log(['model-and-optimizer-setup', + 'train/valid/test-data-iterators-setup'], barrier=True) + + train_args = [forward_step_func, + model, optimizer, opt_param_scheduler, + train_data_iterator, valid_data_iterator, process_non_loss_data_func, config] + test_data_iterator_list = [test_data_iterator] + return train_args, test_data_iterator_list + + +def pretrain(train_valid_test_dataset_provider, + model_provider, + model_type, + forward_step_func, + process_non_loss_data_func=None, + extra_args_provider=None, + args_defaults={}): + """Main training program. + + This function will run the followings in the order provided: + 1) initialize Megatron. + 2) setup model, optimizer and lr schedule using the model_provider. + 3) call train_val_test_data_provider to get train/val/test datasets. + 4) train the modle using the forward_step_func. + + Args: + train_valid_test_dataset_provider: a function that takes the size of + train/valid/test dataset and returns `train, valid, test` datasets. + model_provider: a function that returns a vanilla version of the + model. By vanilla we mean a simple model on cpu with no fp16 or ddp. + model_type: an enum that specifies the type of model being trained. + forward_step_func: a function that takes a `data iterator` and `model`, + and returns a `loss` scalar with a dictionary with key:values being + the info we would like to monitor during training, for example + `lm-loss: value`. We also require that this function add + `batch generator` to the timers class. + process_non_loss_data_func: a function to post process outputs of the + network. It can be used for dumping output tensors (e.g images) to + tensorboard. It takes `collected data`(list of tensors), + `current iteration index` and `tensorboard writer` as arguments. + extra_args_provider: a function that takes a parser and adds arguments + to it. It is used for programs to add their own arguments. + args_defaults: a dictionary from argument-name to argument-value. It + to set already parse arguments. + """ + + # Initalize and get arguments, timers, and Tensorboard writer. + initialize_megatron(extra_args_provider=extra_args_provider, + args_defaults=args_defaults) + + args = get_args() + timers = get_timers() + + logging_dir = os.getenv("AIS_BENCH_LOGGING_DIR") + aislog.init("training", logging_dir) + AISBENCH_TRAIN_TOTAL_TOKENS = int(args.train_iters * args.global_batch_size * args.seq_length / int(os.environ['WORLD_SIZE'])) + + if args.enable_high_availability: + raise AssertionError("High availability feature do not support core_r0.7.0") + + if args.log_progress: + append_to_progress_log("Starting job") + + # Set pytorch JIT layer fusion options and warmup JIT functions. + set_jit_fusion_options() + + # Adjust the startup time so it reflects the largest value. + # This will be closer to what scheduler will see (outside of + # image ... launches. + global _TRAIN_START_TIME + start_time_tensor = torch.tensor([_TRAIN_START_TIME], + dtype=torch.float, + device='cuda') + torch.distributed.all_reduce(start_time_tensor, + op=torch.distributed.ReduceOp.MIN) + _TRAIN_START_TIME = start_time_tensor.item() + print_rank_0('time to initialize megatron (seconds): {:.3f}'.format( + time.time() - _TRAIN_START_TIME)) + print_datetime('after megatron is initialized') + + args = get_args() + timers = get_timers() + + one_logger = get_one_logger() + if one_logger: + one_logger.log_metrics({ + 'train_iterations_warmup': 5 + }) + + train_args, test_data_iterator_list = build_train_args(args, timers, train_valid_test_dataset_provider, + model_provider, + model_type, forward_step_func, process_non_loss_data_func) + + if not args.do_train and not args.do_valid and not args.do_test: + raise RuntimeError('no data loaded, you might give wrong data path.') + + if not args.skip_train: + print_rank_0('training ...') + + if args.dataloader_type == 'cyclic' and args.retro_project_dir: + assert args.retro_cyclic_train_iters is not None + args.train_iters = args.retro_cyclic_train_iters + print_rank_0("retro cyclic train iters : %d" % args.train_iters) + + iteration = 0 + if args.do_train and args.train_iters > 0: + if args.enable_high_availability: + from mindio_ttp.adaptor import tft_init_controller_processor, tft_register_processor, tft_train + tft_init_controller_processor(enable_tls=False, tls_option_top_path='') + tft_register_processor(train_valid_test_dataset_provider, model_provider, model_type) + iteration, num_floating_point_operations_so_far = tft_train(train_args, test_data_iterator_list) + else: + aislog.start("train", AISBENCH_TRAIN_TOTAL_TOKENS) + iteration, num_floating_point_operations_so_far = train(*train_args) + aislog.end("train", AISBENCH_TRAIN_TOTAL_TOKENS) + aislog.event("result", "OK") + aislog.finish() + + test_data_iterator = test_data_iterator_list[0] + forward_step_func, model, optimizer, opt_param_scheduler, train_data_iterator, valid_data_iterator, process_non_loss_data_func, config = train_args + + print_datetime('after training is done') + + if args.save and iteration != 0 and iteration % args.save_interval != 0: + save_checkpoint(iteration, model, optimizer, opt_param_scheduler, + num_floating_point_operations_so_far) + else: + print_rank_0('skipping training (--skip-train is on) ...') + + iteration = args.iteration + + if args.do_valid: + prefix = f'iteration {iteration} on validation set' + evaluate_and_print_results(prefix, forward_step_func, + valid_data_iterator, model, + iteration, process_non_loss_data_func, config, + verbose=True, write_to_tensorboard=not args.skip_train) + + if args.do_test: + prefix = f'iteration {iteration} on test set' + evaluate_and_print_results(prefix, forward_step_func, + test_data_iterator, model, + iteration, process_non_loss_data_func, config, + verbose=True, write_to_tensorboard=not args.skip_train) + + +def train(forward_step_func, model, optimizer, opt_param_scheduler, + train_data_iterator, valid_data_iterator, + process_non_loss_data_func, config): + """Train the model function.""" + args = get_args() + timers = get_timers() + + # Write args to tensorboard + write_args_to_tensorboard() + + # Turn on training mode which enables dropout. + for model_module in model: + model_module.train() + + # Tracking loss. + total_loss_dict = {} + + # Iterations. + iteration = args.iteration + one_logger = get_one_logger() + if one_logger: + iteration_start = iteration + train_samples_start = args.consumed_train_samples + train_samples_target = args.train_samples + one_logger.log_metrics({ + 'train_samples_start': args.consumed_train_samples, + 'train_iterations_start': iteration, + 'train_samples_target': train_samples_target, + 'train_iterations_target': args.train_iters, + }) + + num_floating_point_operations_so_far = 0 + + # Setup some training config params + config.grad_scale_func = optimizer.scale_loss + config.timers = timers + if isinstance(model[0], DDP) and args.overlap_grad_reduce and config.no_sync_func is None: + assert config.no_sync_func is None, \ + ('When overlap_grad_reduce is True, config.no_sync_func must be None; ' + 'a custom no_sync_func is not supported when overlapping grad-reduce') + config.no_sync_func = [model_chunk.no_sync for model_chunk in model] + if len(model) == 1: + config.no_sync_func = config.no_sync_func[0] + if args.delay_grad_reduce: + config.grad_sync_func = [model_chunk.start_grad_sync for model_chunk in model] + if len(model) == 1: + config.grad_sync_func = config.grad_sync_func[0] + if args.overlap_param_gather and args.delay_param_gather: + config.param_sync_func = [lambda x: optimizer.finish_param_sync(model_index, x) + for model_index in range(len(model))] + if len(model) == 1: + config.param_sync_func = config.param_sync_func[0] + config.finalize_model_grads_func = finalize_model_grads + + timers('interval-time', log_level=0).start(barrier=True) + print_datetime('before the start of training step') + report_memory_flag = True + exit = False + + if args.manual_gc: + # Disable the default garbage collector and perform the collection manually. + # This is to align the timing of garbage collection across ranks. + assert args.manual_gc_interval >= 0, \ + 'Manual garbage collection interval should be laerger than or equal to 0.' + gc.disable() + gc.collect() + + num_microbatches = get_num_microbatches() + eval_duration = 0.0 + eval_iterations = 0 + + def track_e2e_metrics(): + # Nested function to track a bunch of E2E APP metrics + if one_logger: + train_duration = timers('interval-time').active_time() # overall_elapsed + train_samples = args.consumed_train_samples - train_samples_start + train_iterations = iteration - iteration_start + train_iterations_time_msecs_avg = ( + train_duration * 1000.0) / train_iterations if train_iterations > 0 else None + if eval_iterations > 0: + validation_iterations_time_msecs_avg = (eval_duration * 1000.0) / eval_iterations + else: + validation_iterations_time_msecs_avg = None + + one_logger.log_metrics({ + 'train_iterations_end': iteration, + 'train_samples_end': args.consumed_train_samples, + 'train_iterations': train_iterations, + 'train_samples': train_samples, + 'train_iterations_time_msecs_avg': train_iterations_time_msecs_avg, + 'validation_iterations_time_msecs_avg': validation_iterations_time_msecs_avg + }) + + if is_profile_enabled(): + prof = get_profiler() + prof.start() + + while iteration < args.train_iters: + + # Update number of microbatches first without consistency check to decide if a + # checkpoint should be saved. If the number of microbatches is different + # from the previous iteration, save a checkpoint. Then run consistency check + # to make sure training configuration is still valid. + update_num_microbatches(args.consumed_train_samples, consistency_check=False) + if get_num_microbatches() != num_microbatches and iteration != 0: + assert get_num_microbatches() > num_microbatches, \ + "number of microbatches should be increasing due to batch size rampup" + save_checkpoint_and_time(iteration, model, optimizer, + opt_param_scheduler, + num_floating_point_operations_so_far, + checkpointing_context=None) + num_microbatches = get_num_microbatches() + update_num_microbatches(args.consumed_train_samples, consistency_check=True) + + args.curr_iteration = iteration + loss_dict, skipped_iter, grad_norm, num_zeros_in_grad = \ + train_step(forward_step_func, + train_data_iterator, + model, + optimizer, + opt_param_scheduler, + config) + iteration += 1 + batch_size = mpu.get_data_parallel_world_size() * \ + args.micro_batch_size * \ + get_num_microbatches() + args.consumed_train_samples += batch_size + num_floating_point_operations_so_far += num_floating_point_operations(args, batch_size) + + # Logging. + loss_scale = optimizer.get_loss_scale().item() + params_norm = None + if args.log_params_norm: + params_norm = calc_params_l2_norm(model) + + if iteration % args.log_interval == 0: + track_e2e_metrics() + + learning_rate = None + decoupled_learning_rate = None + for param_group in optimizer.param_groups: + if param_group['is_decoupled_lr']: + decoupled_learning_rate = param_group['lr'] + else: + learning_rate = param_group['lr'] + report_memory_flag = training_log(loss_dict, total_loss_dict, + learning_rate, + decoupled_learning_rate, + iteration, loss_scale, + report_memory_flag, skipped_iter, + grad_norm, params_norm, num_zeros_in_grad) + + if args.enable_high_availability: + args.num_floating_point_operations_so_far = num_floating_point_operations_so_far + args.iteration = iteration + + # Autoresume + if args.adlr_autoresume and \ + (iteration % args.adlr_autoresume_interval == 0): + check_adlr_autoresume_termination(iteration, model, optimizer, + opt_param_scheduler) + + # Evaluation + if args.eval_interval and iteration % args.eval_interval == 0 and \ + args.do_valid: + timers('interval-time').stop() + if args.use_distributed_optimizer and args.overlap_param_gather: + optimizer.disable_pre_hook() + if args.manual_gc and args.manual_gc_eval: + # Collect all objects. + gc.collect() + prefix = 'iteration {}'.format(iteration) + timers('eval-time', log_level=0).start(barrier=True) + evaluate_and_print_results(prefix, forward_step_func, + valid_data_iterator, model, + iteration, process_non_loss_data_func, + config, False) + eval_duration += timers('eval-time').elapsed() + eval_iterations += args.eval_iters + timers('eval-time').stop() + if args.manual_gc and args.manual_gc_eval: + # Collect only the objects created and used in evaluation. + gc.collect(generation=0) + if args.use_distributed_optimizer and args.overlap_param_gather: + optimizer.enable_pre_hook() + timers('interval-time', log_level=0).start(barrier=True) + + # Checkpointing + saved_checkpoint = False + if args.exit_signal_handler: + signal_handler = get_signal_handler() + if any(signal_handler.signals_received()): + save_checkpoint_and_time(iteration, model, optimizer, + opt_param_scheduler, + num_floating_point_operations_so_far, + checkpointing_context=None) + print_datetime('exiting program after receiving SIGTERM.') + exit = True + break + + if args.save and args.save_interval and \ + iteration % args.save_interval == 0: + timers('interval-time').stop() + save_checkpoint_and_time(iteration, model, optimizer, + opt_param_scheduler, + num_floating_point_operations_so_far, + checkpointing_context=None) + saved_checkpoint = True + timers('interval-time', log_level=0).start(barrier=True) + + # Exiting based on duration + if args.exit_duration_in_mins: + train_time = (time.time() - _TRAIN_START_TIME) / 60.0 + done_cuda = torch.cuda.IntTensor( + [train_time > args.exit_duration_in_mins]) + torch.distributed.all_reduce( + done_cuda, op=torch.distributed.ReduceOp.MAX) + done = done_cuda.item() + if done: + if not saved_checkpoint: + save_checkpoint_and_time(iteration, model, optimizer, + opt_param_scheduler, + num_floating_point_operations_so_far, + checkpointing_context=None) + print_datetime('exiting program after {} minutes'.format(train_time)) + exit = True + break + + # Exiting based on iterations + if args.exit_interval and iteration % args.exit_interval == 0: + if args.save and not saved_checkpoint: + save_checkpoint_and_time(iteration, model, optimizer, + opt_param_scheduler, + num_floating_point_operations_so_far, + checkpointing_context=None) + torch.distributed.barrier() + print_datetime('exiting program at iteration {}'.format(iteration)) + exit = True + break + + if args.manual_gc: + if args.manual_gc_interval != 0 and iteration % args.manual_gc_interval == 0: + gc.collect() + + if is_profile_enabled(): + prof.step() + + if is_profile_enabled(): + prof.stop() + + track_e2e_metrics() + + # Flush TensorBoard and WandB writers. + writer = get_tensorboard_writer() + if writer: + writer.flush() + wandb_writer = get_wandb_writer() + if wandb_writer: + wandb_writer.finish() + + # Close out pre-hooks if using distributed optimizer and overlapped param gather. + if args.use_distributed_optimizer and args.overlap_param_gather: + optimizer.disable_pre_hook() + + # If any exit conditions (signal handler, duration, iterations) have been reached, exit. + if exit: + sys.exit() + + return iteration, num_floating_point_operations_so_far \ No newline at end of file