AttributeError: 'MapDataset' object has no attribute 'client_ids' in tensorflow_federated TFF

447 Views Asked by At

I'm trying to test a compression technique in federated learning with non-IID using this API tff.simulation.datasets.build_single_label_dataset(), following these posts:

But after defining the model and training it, I got this error :

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-16-b04459984716> in <module>()
     10 
     11 train(federated_averaging_process=federated_averaging, num_rounds=10,
---> 12       num_clients_per_round=NUM_CLIENTS, summary_writer=summary_writer)

<ipython-input-15-7157bce2bb0f> in train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer)
     11       # sample the clients parcitipated in this round.
     12       sampled_clients = np.random.choice(
---> 13           fed_emnist_train.client_ids,
     14           size=num_clients_per_round,
     15           replace=False)

AttributeError: 'MapDataset' object has no attribute 'client_ids'

The code:

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits=False)

# for non-IID we use this API tff.simulation.datasets.build_single_label_dataset()
fed_emnist_train = tff.simulation.datasets.build_single_label_dataset(
  emnist_train.create_tf_dataset_from_all_clients(),
  label_key='label', desired_label=1)

MAX_CLIENT_DATASET_SIZE = 418

CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500

def reshape_emnist_element(element):
  return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

def preprocess_train_dataset(dataset):
  return (dataset
          .shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
          .repeat(CLIENT_EPOCHS_PER_ROUND)
          .batch(CLIENT_BATCH_SIZE, drop_remainder=False)
          .map(reshape_emnist_element))

fed_emnist_train = preprocess_train_dataset(fed_emnist_train)

# for unbalanced dataset
import random
NUM_CLIENTS = 100

client_datasets = [
   fed_emnist_train.take(random.randint(1, CLIENT_BATCH_SIZE))
   for _ in range(NUM_CLIENTS)
]

# defining a model 
def create_original_fedavg_cnn_model(only_digits=False):
  data_format = 'channels_last'

  max_pool = functools.partial(
      tf.keras.layers.MaxPooling2D,
      pool_size=(2, 2),
      padding='same',
      data_format=data_format)
  conv2d = functools.partial(
      tf.keras.layers.Conv2D,
      kernel_size=5,
      padding='same',
      data_format=data_format,
      activation=tf.nn.relu)

  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d(filters=32),
      max_pool(),
      conv2d(filters=64),
      max_pool(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(512, activation=tf.nn.relu),
      tf.keras.layers.Dense(10 if only_digits else 62),
      tf.keras.layers.Softmax(),
  ])

  return model

input_spec = client_datasets[0].element_spec

def tff_model_fn():
  keras_model = create_original_fedavg_cnn_model()
  return tff.learning.from_keras_model(
      keras_model=keras_model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

# training the model 
federated_averaging = tff.learning.build_federated_averaging_process(
    model_fn=tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

# utility function
def format_size(size):
  size = float(size)
  for unit in ['bit','Kibit','Mibit','Gibit']:
    if size < 1024.0:
      return "{size:3.2f}{unit}".format(size=size, unit=unit)
    size /= 1024.0
  return "{size:.2f}{unit}".format(size=size, unit='TiB')

def set_sizing_environment():
  sizing_factory = tff.framework.sizing_executor_factory()
  context = tff.framework.ExecutionContext(executor_fn=sizing_factory)
  tff.framework.set_default_context(context)

  return sizing_factory

# trains the federated averaging process and output metrics
def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
  # create a environment to get communication cost
  environment = set_sizing_environment()

  # initialize the FedAvg algorithm to get the initial server state
  state = federated_averaging_process.initialize()

  with summary_writer.as_default():
    for round_num in range(num_rounds):
      # sample the clients parcitipated in this round.
      sampled_clients = np.random.choice(
          fed_emnist_train.client_ids,
          size=num_clients_per_round,
          replace=False)
      # create a list of `tf.Dataset` instances from the data of sampled clients
      sampled_train_data = [
          fed_emnist_train.create_tf_dataset_for_client(client)
          for client in sampled_clients
      ]
      
      state, metrics = federated_averaging_process.next(state, sampled_train_data)

      size_info = environment.get_size_info()
      broadcasted_bits = size_info.broadcast_bits[-1]
      aggregated_bits = size_info.aggregate_bits[-1]

      print('round {:2d}, metrics={}, broadcasted_bits={}, aggregated_bits={}'.format(round_num, metrics, format_size(broadcasted_bits), format_size(aggregated_bits)))

      # add metrics to Tensorboard
      for name, value in metrics['train'].items():
          tf.summary.scalar(name, value, step=round_num)

      tf.summary.scalar('cumulative_broadcasted_bits', broadcasted_bits, step=round_num)
      tf.summary.scalar('cumulative_aggregated_bits', aggregated_bits, step=round_num)
      summary_writer.flush()

# first, clean the log directory to avoid conflicts
try:
  tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
  pass 

# set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)

train(federated_averaging_process=federated_averaging, num_rounds=10,
      num_clients_per_round=NUM_CLIENTS, summary_writer=summary_writer)

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-16-b04459984716> in <module>()
     10 
     11 train(federated_averaging_process=federated_averaging, num_rounds=10,
---> 12       num_clients_per_round=NUM_CLIENTS, summary_writer=summary_writer)

<ipython-input-15-7157bce2bb0f> in train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer)
     11       # sample the clients parcitipated in this round.
     12       sampled_clients = np.random.choice(
---> 13           fed_emnist_train.client_ids,
     14           size=num_clients_per_round,
     15           replace=False)

AttributeError: 'MapDataset' object has no attribute 'client_ids'

What does that mean? Appreciate any help!

1

There are 1 best solutions below

1
On

The code is mixing up tff.simulation.datasets.ClientData and tf.data.Dataset.

tf.data.Dataset does not have a client_ids method, while tff.simulation.datasets.ClientData.client_ids does. The tff.simulation.datasets.build_single_label_dataset method (which constructs fed_emnist_train) returns tf.data.Dataset instances and is currently the input to np.random.choice.

Possibly the input to the random choice was intended to be emnist_train which is a tff.simulation.datasets.ClientData built from tff.simulation.datasets.emnist.load_data?