diff --git a/distill.sh b/distill.sh new file mode 100644 index 000000000..676c3aafd --- /dev/null +++ b/distill.sh @@ -0,0 +1,5 @@ +export CUDA_VISIBLE_DEVICES=6,7 + +python -m paddle.distributed.launch \ + ./tools/train.py \ + -c ./ppfleetx/configs/nlp/gpt/distill_gpt_345M_single_card.yaml diff --git a/ppfleetx/configs/nlp/gpt/distill_gpt_345M_single_card.yaml b/ppfleetx/configs/nlp/gpt/distill_gpt_345M_single_card.yaml new file mode 100644 index 000000000..76c39584d --- /dev/null +++ b/ppfleetx/configs/nlp/gpt/distill_gpt_345M_single_card.yaml @@ -0,0 +1,38 @@ +_base_: ./pretrain_gpt_base.yaml + +Global: + global_batch_size: + local_batch_size: 1 + micro_batch_size: 1 + +Engine: + save_load: + ckpt_dir: PaddleFleetX_GPT_345M_220826 + +Model: + module: "GPTDistillModule" + vocab_size: 50304 + hidden_size: 1024 + num_layers: 24 + num_attention_heads: 16 + ffn_hidden_size: 4096 + hidden_dropout_prob: 0.1 + attention_probs_dropout_prob: 0.1 + max_position_embeddings: 1024 + type_vocab_size: 16 + initializer_range: 0.02 + use_recompute: False + recompute_granularity: + no_recompute_layers: + + +Distributed: + dp_degree: 1 + mp_degree: 1 + pp_degree: 1 + sharding: + sharding_degree: 1 + sharding_stage: 1 + sharding_offload: False + reduce_overlap: False + broadcast_overlap: False diff --git a/ppfleetx/configs/nlp/gpt/eval_gpt_345M_single_card.yaml b/ppfleetx/configs/nlp/gpt/eval_gpt_345M_single_card.yaml index 1b3df70eb..13b39aeed 100644 --- a/ppfleetx/configs/nlp/gpt/eval_gpt_345M_single_card.yaml +++ b/ppfleetx/configs/nlp/gpt/eval_gpt_345M_single_card.yaml @@ -6,8 +6,8 @@ Model: Offline_Eval: - eval_path: ./wikitext-103/wiki.valid.tokens - cloze_eval: False + eval_path: lambada_test.jsonl + cloze_eval: True overlapping_eval: 32 batch_size: 8 max_seq_len: 1024 diff --git a/ppfleetx/configs/nlp/gpt/eval_gpt_6.7B_single_card.yaml b/ppfleetx/configs/nlp/gpt/eval_gpt_6.7B_single_card.yaml new file mode 100644 index 000000000..d20eaf6c8 --- /dev/null +++ b/ppfleetx/configs/nlp/gpt/eval_gpt_6.7B_single_card.yaml @@ -0,0 +1,44 @@ +_base_: ./pretrain_gpt_base.yaml + +Engine: + save_load: + ckpt_dir: pretrain_model + +Model: + module: GPTEvalModule + vocab_size: 50304 + hidden_size: 4096 + num_layers: 32 + num_attention_heads: 32 + ffn_hidden_size: + hidden_dropout_prob: 0.1 + attention_probs_dropout_prob: 0.1 + max_position_embeddings: 1024 + type_vocab_size: 16 + initializer_range: 0.02 + use_recompute: False + recompute_granularity: + no_recompute_layers: + + +Distributed: + dp_degree: 1 + mp_degree: 1 + pp_degree: 1 + sharding: + sharding_degree: 1 + sharding_stage: 1 + sharding_offload: False + reduce_overlap: False + broadcast_overlap: False + + +Offline_Eval: + eval_path: lambada_test.jsonl + cloze_eval: True + overlapping_eval: 32 + batch_size: 8 + max_seq_len: 1024 + logging_freq: 10 + + diff --git a/ppfleetx/core/engine/eager_engine.py b/ppfleetx/core/engine/eager_engine.py index 3bc0bc06c..6f06c8f2f 100644 --- a/ppfleetx/core/engine/eager_engine.py +++ b/ppfleetx/core/engine/eager_engine.py @@ -35,6 +35,13 @@ from ppfleetx.utils.version import version_check from ppfleetx.utils.export import export_inference_model +import nvidia_smi + +nvidia_smi.nvmlInit() + +handle = nvidia_smi.nvmlDeviceGetHandleByIndex(6) +# card id 0 hardcoded here, there is also a call to get all available card ids, so we could iterate + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -179,7 +186,7 @@ def configure_optimizers(self): # distributed configs self._distributed = (dist.get_world_size() > 1) - + ''' if self._distributed: self._hcg = fleet.get_hybrid_communicate_group() self._dp_group = self._hcg.get_data_parallel_group() @@ -194,6 +201,8 @@ def configure_optimizers(self): self._wrap_with_fleet() else: self._dp_rank = 0 + ''' + self._dp_rank = 0 # using for save/load self._load_recovery = {'step': 0, 'epoch': 0, 'rng_state': -1} @@ -284,7 +293,6 @@ def _train_one_epoch(self, loss = self._fit_impl(batch) train_losses.append(loss) - if (step + 1) % self._logging_freq == 0: # Sync for profile time, delete it may be a little faster paddle.device.cuda.synchronize() @@ -300,6 +308,7 @@ def _train_one_epoch(self, 'loss': sum(numpy_losses) / len(numpy_losses), 'lr': self._optimizer.get_lr() } + # if paddle.distributed.get_rank() == 1: self._module.training_step_end(log_dict) train_start = time.time() @@ -333,9 +342,10 @@ def _train_one_epoch(self, } self._module.validation_step_end(log_dict) - self._module.model.train() + if paddle.distributed.get_rank() == 1: + self._module.model.train() - if self._save_steps > 0 and step % self._save_steps == 0: + if self._save_steps > 0 and step % self._save_steps == 0 and paddle.distributed.get_rank() == 1: paddle.device.cuda.synchronize() self.save(epoch=epoch_index, step=step) else: @@ -361,7 +371,10 @@ def fit(self, epoch=1, train_data_loader=None, valid_data_loader=None): valid_data_loader(DataLoader, None): a collection of :class:`paddle.io.DataLoader`, specifying validation samples. """ - self._module.model.train() + if paddle.distributed.get_rank() == 0: + self._module.model.eval() + else: + self._module.model.train() train_cost = 0.0 train_start = time.time() @@ -386,7 +399,8 @@ def fit(self, epoch=1, train_data_loader=None, valid_data_loader=None): if self._run_mode == 'epoch' and self._eval_freq > 0 and \ epoch_index % self._eval_freq == 0: self._evaluate_one_epoch(epoch_index, valid_data_loader) - self._module.model.train() + if paddle.distributed.get_rank() == 1: + self._module.model.train() eval_cost = time.time() - eval_start log_dict = { 'epoch': epoch_index, @@ -416,7 +430,8 @@ def _fit_impl(self, batch): self._dp_group) else: loss = self._model_forward_backward(batch) - self._optim_update_params() + if paddle.distributed.get_rank() == 1: + self._optim_update_params() else: with paddle.amp.auto_cast( self._use_pure_fp16, @@ -450,13 +465,14 @@ def _model_forward_backward(self, batch): level='O2'): loss = self._module.training_step(micro_batch) + if paddle.distributed.get_rank() == 1: loss_bw = self._scaler.scale(loss) if self._use_pure_fp16 else loss self._module.backward(loss_bw) - detach_loss = loss.detach() - if final_loss is None: - final_loss = detach_loss - else: - final_loss = paddle.add(final_loss, detach_loss) + detach_loss = loss.detach() + if final_loss is None: + final_loss = detach_loss + else: + final_loss = paddle.add(final_loss, detach_loss) if self._accumulate_steps > 1: final_loss = final_loss / self._accumulate_steps return final_loss @@ -625,6 +641,9 @@ def save(self, epoch=0, step=0): logger.info("DP_Rank %d doesn't save model" % self._dp_rank) return + if paddle.distributed.get_rank() == 0: + return + if self._output_dir and isinstance(self._output_dir, str): output_dir = os.path.join(self._output_dir, "epoch_%d_step_%d" % (epoch, step)) @@ -632,9 +651,10 @@ def save(self, epoch=0, step=0): os.makedirs(output_dir, exist_ok=True) logger.info("Save model to %s" % output_dir) - save_dir = "{}/mp_{:0>2d}_sharding_{:0>2d}_pp_{:0>2d}".format( - output_dir, self._mp_rank, self._sharding_rank, - self._pp_rank) if self._distributed else output_dir + # save_dir = "{}/mp_{:0>2d}_sharding_{:0>2d}_pp_{:0>2d}".format( + # output_dir, self._mp_rank, self._sharding_rank, + # self._pp_rank) if self._distributed else output_dir + save_dir = output_dir if self._sharding_stage == 3: self._module.model.get_all_parameters(convert2cpu=False) @@ -657,55 +677,42 @@ def load(self): """ load the saved checkpoint file and update the state dicts of model and optimizer. """ + if paddle.distributed.get_rank() == 0: + self._ckpt_dir = 'pretrain_model/' + elif paddle.distributed.get_rank() == 1: + self._ckpt_dir = 'PaddleFleetX_GPT_345M_220826/' + self._ckpt_dir = 'output/epoch_0_step_10000/' if self._ckpt_dir and isinstance(self._ckpt_dir, str): logger.info("Try to load checkpoint from %s " % self._ckpt_dir) - - load_dir = "{}/mp_{:0>2d}_sharding_{:0>2d}_pp_{:0>2d}".format( - self._ckpt_dir, self._mp_rank, self._sharding_rank, - self._pp_rank) if self._distributed else self._ckpt_dir + load_dir = self._ckpt_dir model_path = os.path.join(load_dir, "model.pdparams") - opt_path = os.path.join(load_dir, "model_state.pdopt") - meta_path = os.path.join(load_dir, "meta_state.pdopt") if os.path.exists(model_path): model_dict = paddle.load(model_path) for name, param in self._module.model.state_dict().items(): + print('trying to load {}'.format(name)) assert name in model_dict.keys( ), "No param named `{}` was found in checkpoint file.".format( name) if param.dtype != model_dict[name].dtype: model_dict[name] = model_dict[name].cast(param.dtype) - + print("load: {}".format(name)) self._module.model.set_state_dict(model_dict) else: raise ValueError("No optimizer checkpoint file found in %s." % model_path) - - if self.mode == 'train': - if os.path.exists(opt_path): - opt_dict = paddle.load(opt_path) - self._optimizer.set_state_dict(opt_dict) - else: - raise ValueError( - "No optimizer checkpoint file found in %s." % opt_path) - - if os.path.exists(meta_path): - meta_dict = paddle.load(meta_path) - self._load_recovery = { - 'step': meta_dict['step'], - 'epoch': meta_dict['epoch'], - 'rng_state': meta_dict['cuda_rng_state'] - } - else: - raise ValueError("No meta checkpoint file found in %s." % - meta_path) - logger.info("successfully load checkpoints") else: logger.warning("`load` requires a valid value of `ckpt_dir`.") raise TypeError("`load` requires a valid value of `ckpt_dir`.") + info = nvidia_smi.nvmlDeviceGetMemoryInfo(handle) + + print("Total memory:", info.total/1024**3, "GB") + print("Free memory:", info.free/1024**3, "GB") + print("Used memory:", info.used/1024**3, "GB") + def export(self): self._module.model.eval() input_spec = self._module.input_spec() diff --git a/ppfleetx/data/dataset/gpt_dataset.py b/ppfleetx/data/dataset/gpt_dataset.py index 424317181..f370cfbe4 100644 --- a/ppfleetx/data/dataset/gpt_dataset.py +++ b/ppfleetx/data/dataset/gpt_dataset.py @@ -54,6 +54,8 @@ def __init__(self, local_rank = int(os.getenv("PADDLE_RANK_IN_NODE", 0)) + + ''' if local_rank == 0: try: import ppfleetx.data.data_tools.cpp.fast_index_map_helpers @@ -68,7 +70,7 @@ def __init__(self, flush=True) device_world_size = paddle.distributed.get_world_size() - + device_world_size = 1 if device_world_size > 1 and local_rank != 0: while True: try: @@ -80,12 +82,13 @@ def __init__(self, try: data_world_size = env.get_data_world_size() - + print(data_world_size) logger.info( "The distributed run, total device num:{}, distinct dataflow num:{}.". format(device_world_size, data_world_size)) except AttributeError: pass + ''' assert len(input_dir) == 1, "GPT only support one dataset for now." diff --git a/ppfleetx/models/__init__.py b/ppfleetx/models/__init__.py index e0f435ba7..0ec8a0b8f 100644 --- a/ppfleetx/models/__init__.py +++ b/ppfleetx/models/__init__.py @@ -16,7 +16,7 @@ import copy from ppfleetx.core.module.basic_module import BasicModule -from ppfleetx.models.language_model.language_module import GPTModule, GPTGenerationModule, GPTEvalModule, GPTFinetuneModule +from ppfleetx.models.language_model.language_module import GPTModule, GPTGenerationModule, GPTEvalModule, GPTFinetuneModule, GPTDistillModule from ppfleetx.models.language_model.gpt.auto.auto_module import GPTModuleAuto, GPTGenerationModuleAuto from ppfleetx.models.vision_model.general_classification_module import GeneralClsModule from ppfleetx.models.multimodal_model.multimodal_module import ImagenModule diff --git a/ppfleetx/models/language_model/language_module.py b/ppfleetx/models/language_model/language_module.py index 2f568583f..a31d35729 100644 --- a/ppfleetx/models/language_model/language_module.py +++ b/ppfleetx/models/language_model/language_module.py @@ -120,7 +120,6 @@ def training_epoch_end(self, log_dict): logger.info("[Training] epoch: %d, total time: %.5f sec" % (log_dict['epoch'], log_dict['train_cost'])) - class GPTModule(LanguageModule): def __init__(self, configs): super(GPTModule, self).__init__(configs) @@ -128,6 +127,11 @@ def __init__(self, configs): def get_model(self): model_setting = copy.deepcopy(self.configs.Model) model_setting.pop("module") + model_setting.pop("name") + + model_name = model_setting.pop("name") + tokenizer_class, pretrained_name = MODEL_CLASSES[model_name] + self.tokenizer = tokenizer_class.from_pretrained(pretrained_name) l = model_setting['num_layers'] h = model_setting['hidden_size'] @@ -135,9 +139,7 @@ def get_model(self): s = self.configs.Data.Train.dataset.max_seq_len self.get_model_size(l, h, v, s) - model_name = model_setting.pop("name") - tokenizer_class, pretrained_name = MODEL_CLASSES[model_name] - self.tokenizer = tokenizer_class.from_pretrained(pretrained_name) + self.tokenizer = GPTTokenizer.from_pretrained("gpt2") if self.nranks == 1: model_setting.pop("sequence_parallel") @@ -188,6 +190,106 @@ def inference_end(self, outputs): # ret_str = text[i] + ret_str print(ret_str) +class GPTDistillModule(LanguageModule): + def __init__(self, configs): + super(GPTDistillModule, self).__init__(configs) + paddle.distributed.init_parallel_env() + self.student_group = paddle.distributed.new_group([0, 1]) + + def get_model(self): + model_setting = copy.deepcopy(self.configs.Model) + + if paddle.distributed.get_rank() == 0: + model_setting['hidden_size'] = 4096 + model_setting['num_layers'] = 32 + model_setting['num_attention_heads'] = 32 + model_setting['ffn_hidden_size'] = 16384 + + + model_setting.pop("module") + model_setting.pop("sequence_parallel") + + l = model_setting['num_layers'] + h = model_setting['hidden_size'] + v = model_setting['vocab_size'] + s = self.configs.Data.Train.dataset.max_seq_len + self.get_model_size(l, h, v, s) + + model_name = model_setting.pop("name") + tokenizer_class, pretrained_name = MODEL_CLASSES[model_name] + self.tokenizer = tokenizer_class.from_pretrained(pretrained_name) + + if paddle.distributed.get_rank() == 0: + with paddle.no_grad(): + model = gpt.GPTForPretraining(gpt.GPTModel(**model_setting)) + else: + model = gpt.GPTForPretraining(gpt.GPTModel(**model_setting)) + + + if 'Quantization' in self.configs.keys( + ) and self.configs.Quantization.enable and paddle.distributed.get_rank() == 0: + model = self.qat_model(model) + + return model + + def training_step(self, batch): + tokens, position_ids, labels, loss_mask = batch + + loss_mask.stop_gradient = True + labels.stop_gradient = True + position_ids.stop_gradient = True + + preds = self(tokens, position_ids) + preds = paddle.cast(preds, "float32") + + loss = 0.0 + if paddle.distributed.get_rank() == 0: + loss = self.loss_fn(preds, labels, loss_mask) + paddle.distributed.broadcast( + paddle.to_tensor(preds), + src=0, + group=self.student_group) + elif paddle.distributed.get_rank() == 1: + with paddle.amp.auto_cast(enable=False): + loss = self.loss_fn(preds, labels, loss_mask) + teacher_encoder_output = paddle.zeros(preds.shape) + teacher_encoder_output.stop_gradient = True + paddle.distributed.broadcast( + teacher_encoder_output, + src=0, + group=self.student_group) + with paddle.amp.auto_cast(enable=False): + dist_loss = paddle.nn.functional.mse_loss(preds, teacher_encoder_output, reduction='mean') + loss = loss + dist_loss + return loss + + def get_loss_fn(self): + loss_fn = gpt.GPTPretrainingCriterion() + return loss_fn + + def pretreating_batch(self, batch): + if self.configs.Distributed.pp_degree > 1: + tokens, position_ids, labels, loss_mask = batch + data = [(tokens, position_ids), (labels, loss_mask)] + return data + else: + return batch + + def input_spec(self): + return [ + InputSpec( + shape=[None, None], name="tokens", dtype='int64'), InputSpec( + shape=[None, None], name="ids", dtype='int64') + ] + + def inference_end(self, outputs): + for k, v in outputs.items(): + for i in range(v.shape[0]): + out_ids = [int(x) for x in v[i]] + ret_str = self.tokenizer.decode(out_ids) + # ret_str = text[i] + ret_str + print(ret_str) + class GPTFinetuneModule(BasicModule): def __init__(self, configs): diff --git a/ppfleetx/optims/optimizer.py b/ppfleetx/optims/optimizer.py index 005e39e87..fd665782d 100644 --- a/ppfleetx/optims/optimizer.py +++ b/ppfleetx/optims/optimizer.py @@ -31,7 +31,7 @@ class FusedAdamW(paddle.optimizer.AdamW): def __init__(self, learning_rate, parameters, grad_clip, **config): tensor_fusion = config.pop("tensor_fusion", False) - if paddle.distributed.get_world_size() > 1: + if False: #paddle.distributed.get_world_size() > 1: hcg = fleet.get_hybrid_communicate_group() sharding_size = hcg.get_sharding_parallel_world_size() diff --git a/ppfleetx/utils/config.py b/ppfleetx/utils/config.py index 9e5998535..005b1e185 100644 --- a/ppfleetx/utils/config.py +++ b/ppfleetx/utils/config.py @@ -52,10 +52,10 @@ def process_dist_config(configs): assert nranks % other_degree == 0, "unreasonable config of dist_strategy." dp_degree = config.setdefault("dp_degree", nranks // other_degree) assert nranks % dp_degree == 0, "unreasonable config of dist_strategy." - assert nranks == dp_degree * other_degree, \ - "Mismatched config using {} cards with dp_degree[{}]," \ - "mp_degree[{}], pp_degree[{}] and sharding_degree[{}]".format(nranks, \ - dp_degree, mp_degree, pp_degree, _sharding_degree) + # assert nranks == dp_degree * other_degree, \ + # "Mismatched config using {} cards with dp_degree[{}]," \ + # "mp_degree[{}], pp_degree[{}] and sharding_degree[{}]".format(nranks, \ + # dp_degree, mp_degree, pp_degree, _sharding_degree) if sharding_config['sharding_degree'] > 1 and reduce_overlap: if sharding_config['sharding_stage'] == 3 or sharding_config[ diff --git a/ppfleetx/utils/env.py b/ppfleetx/utils/env.py index 588706c5f..7836af460 100644 --- a/ppfleetx/utils/env.py +++ b/ppfleetx/utils/env.py @@ -28,7 +28,7 @@ def set_seed(seed): - if dist.get_world_size() > 1: + if False:# dist.get_world_size() > 1: # obtain rank message of hybrid parallel hcg = fleet.get_hybrid_communicate_group() mp_rank = hcg.get_model_parallel_rank() @@ -102,7 +102,7 @@ def get_local_rank(): def get_data_world_size(): if paddle.distributed.get_world_size() == 1: return 1 - + return 1 hcg = fleet.get_hybrid_communicate_group() dp_size = hcg.get_data_parallel_world_size() sharding_size = hcg.get_sharding_parallel_world_size() @@ -113,7 +113,7 @@ def get_data_world_size(): def get_data_world_rank(): if paddle.distributed.get_world_size() == 1: return 0 - + return 0 hcg = fleet.get_hybrid_communicate_group() dp_rank = hcg.get_data_parallel_rank() sharding_rank = hcg.get_sharding_parallel_rank() diff --git a/tools/eval.py b/tools/eval.py index 8053268b3..f70def73a 100644 --- a/tools/eval.py +++ b/tools/eval.py @@ -46,8 +46,8 @@ valid_data_loader = build_dataloader(cfg.Data, "Eval") - if cfg.Engine.save_load.ckpt_dir is not None: - engine.load() + # if cfg.Engine.save_load.ckpt_dir is not None: + engine.load() engine.evaluate( valid_data_loader=valid_data_loader, epoch=cfg.Engine.num_train_epochs) diff --git a/tools/train.py b/tools/train.py index 20b64e14a..20ac73511 100644 --- a/tools/train.py +++ b/tools/train.py @@ -36,8 +36,8 @@ args = config.parse_args() cfg = config.get_config(args.config, overrides=args.override, show=False) - if dist.get_world_size() > 1: - fleet.init(is_collective=True, strategy=env.init_dist_env(cfg)) + # if dist.get_world_size() > 1: + # fleet.init(is_collective=True, strategy=env.init_dist_env(cfg)) env.set_seed(cfg.Global.seed)