I am trying to use FSDP with Accelerate and I seem to keep running into the same error:
TypeError: FullyShardedDataParallel.__init__() got an unexpected keyword argument 'ignored_parameters'
I run it via NCCL_P2P_DISABLE=1 accelerate launch new.py
Full error log at the bottom. My script works fine without implementing FSDP. This is my code followed by the accelerate yaml config files.
import html
import json
import os
from time import time
import pandas as pd
import torch
from accelerate import Accelerator
from datasets import Dataset
from torch.optim import AdamW
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
get_linear_schedule_with_warmup,
set_seed,
)
def process_dataset(json_file):
with open(json_file, "r", encoding="utf-8") as file:
data = json.load(file)
def concatenate_messages(group):
conversation = ''
for _, row in group.iterrows():
if row['PCP_MESSAGE'].strip():
conversation += 'PCP: ' + row['PCP_MESSAGE'].strip() + '\n'
if row['SR_MESSAGE'].strip():
conversation += 'SR: ' + row['SR_MESSAGE'].strip() + '\n'
return conversation
df = pd.DataFrame(data)
df['PCP_MESSAGE'].fillna('', inplace=True)
df['SR_MESSAGE'].fillna('', inplace=True)
df['PCP_MESSAGE'] = df['PCP_MESSAGE'].apply(html.unescape)
df['SR_MESSAGE'] = df['SR_MESSAGE'].apply(html.unescape)
df.sort_values(by=['CONSULTID', 'CREATED'], inplace=True)
grouped = df.groupby('CONSULTID').apply(
concatenate_messages).reset_index(name='conversation')
return grouped
def get_dataloaders(accelerator: Accelerator, batch_size: int = 16):
tokenizer = AutoTokenizer.from_pretrained("gpt2-xl")
tokenizer.pad_token = tokenizer.eos_token
dataset = Dataset.from_pandas(process_dataset("./asdf.json"))
def tokenize_function(examples):
return tokenizer(examples["conversation"], padding="max_length", truncation=True, max_length=128)
with accelerator.main_process_first():
tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset.set_format(
'torch', columns=['input_ids', 'attention_mask'])
train_size = int(0.8 * len(tokenized_dataset))
test_size = len(tokenized_dataset) - train_size
tokenized_train_dataset, tokenized_eval_dataset = torch.utils.data.random_split(
tokenized_dataset, [train_size, test_size])
# Instantiate dataloaders.
train_dataloader = DataLoader(
tokenized_train_dataset,
shuffle=True,
batch_size=batch_size,
drop_last=True
)
eval_dataloader = DataLoader(
tokenized_eval_dataset,
shuffle=False,
batch_size=batch_size*2,
drop_last=(accelerator.mixed_precision == "fp8"),
)
return train_dataloader, eval_dataloader
def training_function():
# Initialize accelerator
accelerator = Accelerator(mixed_precision="fp16")
# Sample hyper-parameters for learning rate, batch size, seed and a few other HPs
lr = 5e-5
num_epochs = 3
seed = 42
batch_size = 1
num_warmup_steps = 100
set_seed(seed)
train_dataloader, eval_dataloader = get_dataloaders(
accelerator, batch_size)
# Instantiate the model (we build the model here so that the seed also control new weights initialization)
model = AutoModelForCausalLM.from_pretrained("gpt2")
model = accelerator.prepare(model)
optimizer = AdamW(params=model.parameters(), lr=lr)
# Instantiate scheduler
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=num_warmup_steps,
num_training_steps=(len(train_dataloader) *
num_epochs),
)
# Prepare everything
# There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the
# prepare method.
optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
optimizer, train_dataloader, eval_dataloader, lr_scheduler
)
# Initialize logging variables
total_train_loss = 0
total_eval_loss = 0
# Now we train the model
for epoch in range(num_epochs):
model.train()
total_train_loss = 0
for batch in tqdm(train_dataloader, desc="Training"):
with accelerator.accumulate(model):
# Process the batch
inputs = {k: v.to(accelerator.device)
for k, v in batch.items()}
if "labels" not in inputs:
inputs["labels"] = inputs["input_ids"]
outputs = model(**inputs)
loss = outputs.loss
total_train_loss += loss.item()
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
# Evaluation loop after each training epoch
model.eval()
total_eval_loss = 0
for batch in tqdm(eval_dataloader, "Evaluating"):
with torch.no_grad():
inputs = {k: v.to(accelerator.device)
for k, v in batch.items()}
if "labels" not in inputs:
inputs["labels"] = inputs["input_ids"]
outputs = model(**inputs)
loss = outputs.loss
total_eval_loss += loss.item()
# Log the average losses
avg_train_loss = total_train_loss / len(train_dataloader)
avg_eval_loss = total_eval_loss / len(eval_dataloader)
print(
f"Epoch: {epoch}, Average Training Loss: {avg_train_loss}, Average Evaluation Loss: {avg_eval_loss}")
# You can also include other evaluation metrics here if needed
if accelerator.is_main_process:
output_dir = "./accExModel"
os.makedirs(output_dir, exist_ok=True)
# Check if the model is wrapped with DDP and save accordingly
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
model.module.save_pretrained(output_dir)
else:
model.save_pretrained(output_dir)
print(f"Model saved to {output_dir}")
def main():
training_function()
if __name__ == "__main__":
start = time()
main()
print(f"Total Execution Time: {time() - start} seconds")
Here is the yaml config file that doesn't use FSDP
compute_environment: LOCAL_MACHINE
distributed_type: MULTI_GPU
downcast_bf16: 'no'
gpu_ids: all
machine_rank: 0
main_training_function: main
mixed_precision: fp16
num_machines: 1
num_processes: 6
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false
Here is the yaml using FSDP
compute_environment: LOCAL_MACHINE
distributed_type: FSDP
downcast_bf16: 'no'
fsdp_config:
fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP
fsdp_backward_prefetch_policy: BACKWARD_PRE
fsdp_forward_prefetch: false
fsdp_offload_params: false
fsdp_sharding_strategy: 2
fsdp_state_dict_type: SHARDED_STATE_DICT
fsdp_sync_module_states: false
fsdp_transformer_layer_cls_to_wrap: GPT2Block
fsdp_use_orig_params: false
machine_rank: 0
main_training_function: main
mixed_precision: fp16
num_machines: 1
num_processes: 6
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false
This is the error I seem to be getting
Traceback (most recent call last):
File "/home/username/test/new.py", line 183, in <module>
main()
File "/home/username/test/new.py", line 178, in main
training_function()
File "/home/username/test/new.py", line 102, in training_function
model = accelerator.prepare(model)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/accelerator.py", line 1202, in prepare
result = tuple(
^^^^^^
File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/accelerator.py", line 1203, in <genexpr>
self._prepare_one(obj, first_pass=True, device_placement=d) for obj, d in zip(args, device_placement)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/accelerator.py", line 1030, in _prepare_one
return self.prepare_model(obj, device_placement=device_placement)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/accelerator.py", line 1366, in prepare_model
model = FSDP(model, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
TypeError: FullyShardedDataParallel.__init__() got an unexpected keyword argument 'ignored_parameters'
[2024-01-17 19:07:24,361] torch.distributed.elastic.multiprocessing.api: [ERROR] failed (exitcode: 1) local_rank: 0 (pid: 8982) of binary: /home/username/miniconda3/bin/python
Traceback (most recent call last):
File "/home/username/miniconda3/bin/accelerate", line 11, in <module>
sys.exit(main())
^^^^^^
File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/commands/accelerate_cli.py", line 45, in main
args.func(args)
File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/commands/launch.py", line 966, in launch_command
multi_gpu_launcher(args)
File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/commands/launch.py", line 646, in multi_gpu_launcher
distrib_run.run(args)
File "/home/username/miniconda3/lib/python3.11/site-packages/torch/distributed/run.py", line 797, in run
elastic_launch(
File "/home/username/miniconda3/lib/python3.11/site-packages/torch/distributed/launcher/api.py", line 134, in __call__
return launch_agent(self._config, self._entrypoint, list(args))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^