# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging.config
import os
import shlex
import shutil
import subprocess
import sys
import tempfile
import threading
import time
from argparse import Namespace
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Manager, Process
from multiprocessing.connection import Client
from nvflare.apis.fl_component import FLComponent
from nvflare.apis.fl_constant import JobConstants, MachineStatus, WorkspaceConstants
from nvflare.apis.job_def import ALL_SITES, JobMetaKey
from nvflare.apis.utils.job_utils import convert_legacy_zipped_app_to_job
from nvflare.apis.workspace import Workspace
from nvflare.fuel.common.multi_process_executor_constants import CommunicationMetaData
from nvflare.fuel.hci.server.authz import AuthorizationService
from nvflare.fuel.sec.audit import AuditService
from nvflare.fuel.utils.network_utils import get_open_ports
from nvflare.fuel.utils.zip_utils import split_path, unzip_all_from_bytes, zip_directory_to_bytes
from nvflare.lighter.poc_commands import get_host_gpu_ids
from nvflare.private.defs import AppFolderConstants
from nvflare.private.fed.app.client.worker_process import kill_child_processes
from nvflare.private.fed.app.deployer.simulator_deployer import SimulatorDeployer
from nvflare.private.fed.client.client_status import ClientStatus
from nvflare.private.fed.server.job_meta_validator import JobMetaValidator
from nvflare.private.fed.simulator.simulator_app_runner import SimulatorServerAppRunner
from nvflare.private.fed.simulator.simulator_const import SimulatorConstants
from nvflare.private.fed.utils.fed_utils import add_logfile_handler, fobs_initialize
from nvflare.security.logging import secure_format_exception
from nvflare.security.security import EmptyAuthorizer
[docs]class SimulatorRunner(FLComponent):
def __init__(
self, job_folder: str, workspace: str, clients=None, n_clients=None, threads=None, gpu=None, max_clients=100
):
super().__init__()
self.job_folder = job_folder
self.workspace = workspace
self.clients = clients
self.n_clients = n_clients
self.threads = threads
self.gpu = gpu
self.max_clients = max_clients
self.ask_to_stop = False
self.simulator_root = None
self.services = None
self.deployer = SimulatorDeployer()
self.client_names = []
self.federated_clients = []
self.client_config = None
self.deploy_args = None
def _generate_args(
self, job_folder: str, workspace: str, clients=None, n_clients=None, threads=None, gpu=None, max_clients=100
):
args = Namespace(
job_folder=job_folder,
workspace=workspace,
clients=clients,
n_clients=n_clients,
threads=threads,
gpu=gpu,
max_clients=max_clients,
)
args.set = []
return args
[docs] def setup(self):
running_dir = os.getcwd()
if self.workspace is None:
self.workspace = "simulator_workspace"
self.logger.warn(
f"Simulator workspace is not provided. Set it to the default location:"
f" {os.path.join(running_dir, self.workspace)}"
)
self.workspace = os.path.join(running_dir, self.workspace)
self.args = self._generate_args(
self.job_folder, self.workspace, self.clients, self.n_clients, self.threads, self.gpu, self.max_clients
)
if self.args.clients:
self.client_names = self.args.clients.strip().split(",")
else:
if self.args.n_clients is None:
self.args.n_clients = 2
self.logger.warn("The number of simulate clients is not provided. Set it to default: 2")
if self.args.threads is None:
self.args.threads = 1
self.logger.warn("The number of threads is not provided. Set it to default: 1")
for i in range(self.args.n_clients):
self.client_names.append("site-" + str(i + 1))
log_config_file_path = os.path.join(self.args.workspace, "startup", WorkspaceConstants.LOGGING_CONFIG)
if not os.path.isfile(log_config_file_path):
log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG)
logging.config.fileConfig(fname=log_config_file_path, disable_existing_loggers=False)
local_dir = os.path.join(self.args.workspace, "local")
os.makedirs(local_dir, exist_ok=True)
shutil.copyfile(log_config_file_path, os.path.join(local_dir, WorkspaceConstants.LOGGING_CONFIG))
self.args.log_config = None
self.args.config_folder = "config"
self.args.job_id = SimulatorConstants.JOB_NAME
self.args.client_config = os.path.join(self.args.config_folder, JobConstants.CLIENT_JOB_CONFIG)
self.args.env = os.path.join("config", AppFolderConstants.CONFIG_ENV)
cwd = os.getcwd()
self.args.job_folder = os.path.join(cwd, self.args.job_folder)
if not os.path.exists(self.args.workspace):
os.makedirs(self.args.workspace)
os.chdir(self.args.workspace)
fobs_initialize()
AuthorizationService.initialize(EmptyAuthorizer())
AuditService.initialize(audit_file_name=WorkspaceConstants.AUDIT_LOG)
self.simulator_root = os.path.join(self.args.workspace, SimulatorConstants.JOB_NAME)
if os.path.exists(self.simulator_root):
shutil.rmtree(self.simulator_root)
os.makedirs(self.simulator_root)
log_file = os.path.join(self.simulator_root, WorkspaceConstants.LOG_FILE_NAME)
add_logfile_handler(log_file)
try:
data_bytes, job_name, meta = self.validate_job_data()
if not self.client_names:
self.client_names = self._extract_client_names_from_meta(meta)
if self.max_clients < len(self.client_names):
self.logger.error(
f"The number of clients ({len(self.client_names)}) can not be more than the "
f"max_number of clients ({self.max_clients})"
)
return False
if self.args.gpu:
gpus = self.args.gpu.split(",")
host_gpus = [str(x) for x in (get_host_gpu_ids())]
if host_gpus and not set(gpus).issubset(host_gpus):
wrong_gpus = [x for x in gpus if x not in host_gpus]
self.logger.error(f"These GPUs are not available: {wrong_gpus}")
return False
if len(gpus) <= 1:
self.logger.error("Please provide more than 1 GPU to run the Simulator with multi-GPUs.")
return False
if len(gpus) > len(self.client_names):
self.logger.error(
f"The number of clients ({len(self.client_names)}) must be larger than or equal to "
f"the number of GPUS: ({len(gpus)})"
)
return False
if self.args.threads and self.args.threads > 1:
self.logger.info(
"When running with multi GPU, each GPU will run with only 1 thread. " "Set the Threads to 1."
)
self.args.threads = 1
if self.args.threads and self.args.threads > len(self.client_names):
self.logger.error("The number of threads to run can not be larger than the number of clients.")
return False
if not (self.args.gpu or self.args.threads):
self.logger.error("Please provide the number of threads or provide gpu options to run the simulator.")
return False
self._validate_client_names(meta, self.client_names)
# Deploy the FL server
self.logger.info("Create the Simulator Server.")
simulator_server, self.services = self.deployer.create_fl_server(self.args)
self.services.deploy(self.args, grpc_args=simulator_server)
self.logger.info("Deploy the Apps.")
self._deploy_apps(job_name, data_bytes, meta)
return True
except BaseException as e:
self.logger.error(f"Simulator setup error: {secure_format_exception(e)}")
return False
[docs] def validate_job_data(self):
# Validate the simulate job
job_name = split_path(self.args.job_folder)[1]
data = zip_directory_to_bytes("", self.args.job_folder)
data_bytes = convert_legacy_zipped_app_to_job(data)
job_validator = JobMetaValidator()
valid, error, meta = job_validator.validate(job_name, data_bytes)
if not valid:
raise RuntimeError(error)
return data_bytes, job_name, meta
def _extract_client_names_from_meta(self, meta):
client_names = []
for _, participants in meta.get(JobMetaKey.DEPLOY_MAP).items():
for p in participants:
if p.upper() != ALL_SITES and p != "server":
client_names.append(p)
return client_names
def _validate_client_names(self, meta, client_names):
no_app_clients = []
for name in client_names:
name_matched = False
for app_name, participants in meta.get(JobMetaKey.DEPLOY_MAP).items():
if len(participants) == 1 and participants[0].upper() == ALL_SITES:
name_matched = True
break
if name in participants:
name_matched = True
break
if not name_matched:
no_app_clients.append(name)
if no_app_clients:
raise RuntimeError(f"The job does not have App to run for clients: {no_app_clients}")
def _deploy_apps(self, job_name, data_bytes, meta):
with tempfile.TemporaryDirectory() as temp_dir:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
os.mkdir(temp_dir)
unzip_all_from_bytes(data_bytes, temp_dir)
temp_job_folder = os.path.join(temp_dir, job_name)
app_server_root = os.path.join(self.simulator_root, "app_server")
for app_name, participants in meta.get(JobMetaKey.DEPLOY_MAP).items():
if len(participants) == 1 and participants[0].upper() == ALL_SITES:
participants = ["server"]
participants.extend([client for client in self.client_names])
for p in participants:
if p == "server":
app = os.path.join(temp_job_folder, app_name)
shutil.copytree(app, app_server_root)
elif p in self.client_names:
app_client_root = os.path.join(self.simulator_root, "app_" + p)
app = os.path.join(temp_job_folder, app_name)
shutil.copytree(app, app_client_root)
[docs] def split_clients(self, clients: [], gpus: []):
split_clients = []
for _ in gpus:
split_clients.append([])
index = 0
for client in clients:
split_clients[index % len(gpus)].append(client)
index += 1
return split_clients
[docs] def create_clients(self):
# Deploy the FL clients
self.logger.info("Create the simulate clients.")
for client_name in self.client_names:
client, self.client_config, self.deploy_args = self.deployer.create_fl_client(client_name, self.args)
self.federated_clients.append(client)
app_root = os.path.join(self.simulator_root, "app_" + client_name)
app_custom_folder = os.path.join(app_root, "custom")
sys.path.append(app_custom_folder)
self.logger.info("Set the client status ready.")
self._set_client_status()
def _set_client_status(self):
for client in self.federated_clients:
app_client_root = os.path.join(self.simulator_root, "app_" + client.client_name)
client.app_client_root = app_client_root
client.args = self.args
# self.create_client_runner(client)
client.simulate_running = False
client.status = ClientStatus.STARTED
[docs] def run(self):
try:
manager = Manager()
return_dict = manager.dict()
process = Process(target=self.run_processs, args=(return_dict,))
process.start()
process.join()
run_status = return_dict["run_status"]
return run_status
except KeyboardInterrupt:
self.logger.info("KeyboardInterrupt, terminate all the child processes.")
kill_child_processes(os.getpid())
[docs] def run_processs(self, return_dict):
if self.setup():
try:
self.create_clients()
self.logger.info("Deploy and start the Server App.")
server_thread = threading.Thread(target=self.start_server_app, args=[])
server_thread.start()
# wait for the server app is started
while self.services.engine.engine_info.status != MachineStatus.STARTED:
time.sleep(1.0)
if not server_thread.is_alive():
raise RuntimeError("Could not start the Server App.")
if self.args.gpu:
gpus = self.args.gpu.split(",")
split_clients = self.split_clients(self.federated_clients, gpus)
else:
gpus = [None]
split_clients = [self.federated_clients]
executor = ThreadPoolExecutor(max_workers=len(gpus))
for index in range(len(gpus)):
clients = split_clients[index]
executor.submit(lambda p: self.client_run(*p), [clients, gpus[index]])
executor.shutdown()
server_thread.join()
run_status = 0
except BaseException as e:
self.logger.error(f"Simulator run error: {secure_format_exception(e)}")
run_status = 2
finally:
self.deployer.close()
else:
run_status = 1
return_dict["run_status"] = run_status
os._exit(0)
[docs] def client_run(self, clients, gpu):
client_runner = SimulatorClientRunner(self.args, clients, self.client_config, self.deploy_args)
client_runner.run(gpu)
[docs] def start_server_app(self):
app_server_root = os.path.join(self.simulator_root, "app_server")
self.args.server_config = os.path.join("config", JobConstants.SERVER_JOB_CONFIG)
app_custom_folder = os.path.join(app_server_root, "custom")
sys.path.append(app_custom_folder)
startup = os.path.join(self.args.workspace, WorkspaceConstants.STARTUP_FOLDER_NAME)
os.makedirs(startup, exist_ok=True)
local = os.path.join(self.args.workspace, WorkspaceConstants.SITE_FOLDER_NAME)
os.makedirs(local, exist_ok=True)
workspace = Workspace(root_dir=self.args.workspace, site_name="server")
server_app_runner = SimulatorServerAppRunner()
snapshot = None
server_app_runner.start_server_app(
workspace, self.services, self.args, app_server_root, self.args.job_id, snapshot, self.logger
)
[docs]class SimulatorClientRunner(FLComponent):
def __init__(self, args, clients: [], client_config, deploy_args):
super().__init__()
self.args = args
self.federated_clients = clients
self.run_client_index = -1
self.simulator_root = os.path.join(self.args.workspace, SimulatorConstants.JOB_NAME)
self.client_config = client_config
self.deploy_args = deploy_args
[docs] def run(self, gpu):
try:
# self.create_clients()
self.logger.info("Start the clients run simulation.")
executor = ThreadPoolExecutor(max_workers=self.args.threads)
lock = threading.Lock()
for i in range(self.args.threads):
executor.submit(lambda p: self.run_client_thread(*p), [self.args.threads, gpu, lock])
# wait for the server and client running thread to finish.
executor.shutdown()
except BaseException as e:
self.logger.error(f"SimulatorClientRunner run error: {secure_format_exception(e)}")
finally:
for client in self.federated_clients:
# client.engine.shutdown()
client.close()
# self.deployer.close()
[docs] def run_client_thread(self, num_of_threads, gpu, lock):
stop_run = False
interval = 1
client_to_run = None # indicates the next client to run
try:
while not stop_run:
time.sleep(interval)
with lock:
if not client_to_run:
client = self.get_next_run_client()
else:
client = client_to_run
client.simulate_running = True
stop_run, client_to_run = self.do_one_task(client, num_of_threads, gpu, lock)
client.simulate_running = False
except BaseException as e:
self.logger.error(f"run_client_thread error: {secure_format_exception(e)}")
[docs] def do_one_task(self, client, num_of_threads, gpu, lock):
open_port = get_open_ports(1)[0]
command = (
sys.executable
+ " -m nvflare.private.fed.app.simulator.simulator_worker -o "
+ self.args.workspace
+ " --client "
+ client.client_name
+ " --port "
+ str(open_port)
+ " --parent_pid "
+ str(os.getpid())
)
if gpu:
command += " --gpu " + str(gpu)
_ = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=os.environ.copy())
conn = self._create_connection(open_port)
data = {
SimulatorConstants.CLIENT: client,
SimulatorConstants.CLIENT_CONFIG: self.client_config,
SimulatorConstants.DEPLOY_ARGS: self.deploy_args,
}
conn.send(data)
while True:
stop_run = conn.recv()
with lock:
if num_of_threads != len(self.federated_clients):
next_client = self.get_next_run_client()
else:
next_client = client
if not stop_run and next_client.client_name == client.client_name:
conn.send(True)
else:
conn.send(False)
break
return stop_run, next_client
def _create_connection(self, open_port):
conn = None
while not conn:
try:
address = ("localhost", open_port)
conn = Client(address, authkey=CommunicationMetaData.CHILD_PASSWORD.encode())
except BaseException:
time.sleep(1.0)
pass
return conn
[docs] def get_next_run_client(self):
# Find the next client which is not currently running
while True:
self.run_client_index = (self.run_client_index + 1) % len(self.federated_clients)
client = self.federated_clients[self.run_client_index]
if not client.simulate_running:
break
self.logger.info(f"Simulate Run client: {client.client_name}")
return client