# Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import io
import json
import logging
import os
import shutil
from typing import Dict, List
import nvflare.fuel.hci.file_transfer_defs as ftd
from nvflare.apis.fl_constant import AdminCommandNames
from nvflare.apis.job_def import Job, JobDataKey, JobMetaKey, TopDir
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec, RunStatus
from nvflare.apis.utils.job_utils import convert_legacy_zipped_app_to_job
from nvflare.fuel.hci.base64_utils import b64str_to_bytes, bytes_to_b64str
from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.proto import ConfirmMethod
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
from nvflare.fuel.hci.server.authz import PreAuthzReturnCode
from nvflare.fuel.hci.server.constants import ConnProps
from nvflare.fuel.hci.table import Table
from nvflare.fuel.utils.argument_utils import SafeArgumentParser
from nvflare.fuel.utils.obj_utils import get_size
from nvflare.fuel.utils.zip_utils import ls_zip_from_bytes, unzip_all_from_bytes, zip_directory_to_bytes
from nvflare.private.defs import RequestHeader, TrainingTopic
from nvflare.private.fed.server.admin import new_message
from nvflare.private.fed.server.job_meta_validator import JobMetaValidator
from nvflare.private.fed.server.server_engine import ServerEngine
from nvflare.private.fed.server.server_engine_internal_spec import ServerEngineInternalSpec
from nvflare.security.logging import secure_format_exception, secure_log_traceback
from .cmd_utils import CommandUtil
MAX_DOWNLOAD_JOB_SIZE = 50 * 1024 * 1024 * 1204
CLONED_META_KEYS = {
JobMetaKey.JOB_NAME.value,
JobMetaKey.JOB_FOLDER_NAME.value,
JobMetaKey.DEPLOY_MAP.value,
JobMetaKey.RESOURCE_SPEC.value,
JobMetaKey.CONTENT_LOCATION.value,
JobMetaKey.RESULT_LOCATION.value,
JobMetaKey.APPROVALS.value,
JobMetaKey.MIN_CLIENTS.value,
JobMetaKey.MANDATORY_CLIENTS.value,
}
[docs]class JobCommandModule(CommandModule, CommandUtil):
"""Command module with commands for job management."""
def __init__(self):
super().__init__()
self.logger = logging.getLogger(self.__class__.__name__)
[docs] def get_spec(self):
return CommandModuleSpec(
name="job_mgmt",
cmd_specs=[
CommandSpec(
name=AdminCommandNames.DELETE_WORKSPACE,
description="delete the workspace of a job",
usage="delete_workspace job_id",
handler_func=self.delete_job_id,
authz_func=self.authorize_job,
enabled=False,
confirm=ConfirmMethod.AUTH,
),
CommandSpec(
name=AdminCommandNames.START_APP,
description="start the FL app",
usage="start_app job_id server|client|all",
handler_func=self.start_app,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.LIST_JOBS,
description="list submitted jobs",
usage="list_jobs [-n name_prefix] [-d] [job_id_prefix]",
handler_func=self.list_jobs,
authz_func=self.command_authz_required,
),
CommandSpec(
name=AdminCommandNames.DELETE_JOB,
description="delete a job and persisted workspace",
usage="delete_job job_id",
handler_func=self.delete_job,
authz_func=self.authorize_job,
confirm=ConfirmMethod.AUTH,
),
CommandSpec(
name=AdminCommandNames.ABORT_JOB,
description="abort a job if it is running or dispatched",
usage="abort_job job_id",
handler_func=self.abort_job, # see if running, if running, send abort command
authz_func=self.authorize_job,
confirm=ConfirmMethod.YESNO,
),
CommandSpec(
name=AdminCommandNames.ABORT_TASK,
description="abort the client current task execution",
usage="abort_task job_id <client-name>",
handler_func=self.abort_task,
authz_func=self.authorize_abort_client_task,
),
CommandSpec(
name=AdminCommandNames.CLONE_JOB,
description="clone a job with a new job_id",
usage="clone_job job_id",
handler_func=self.clone_job,
authz_func=self.authorize_job,
),
CommandSpec(
name=AdminCommandNames.SUBMIT_JOB,
description="submit a job",
usage="submit_job job_folder",
handler_func=self.submit_job,
authz_func=self.command_authz_required,
client_cmd=ftd.UPLOAD_FOLDER_FQN,
),
CommandSpec(
name=AdminCommandNames.DOWNLOAD_JOB,
description="download a specified job",
usage="download_job job_id",
handler_func=self.download_job,
authz_func=self.authorize_job,
client_cmd=ftd.DOWNLOAD_FOLDER_FQN,
),
],
)
[docs] def authorize_job(self, conn: Connection, args: List[str]):
if len(args) < 2:
conn.append_error("syntax error: missing job_id")
return PreAuthzReturnCode.ERROR
job_id = args[1].lower()
conn.set_prop(self.JOB_ID, job_id)
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
with engine.new_context() as fl_ctx:
job = job_def_manager.get_job(job_id, fl_ctx)
if not job:
conn.append_error(f"Job with ID {job_id} doesn't exist")
return PreAuthzReturnCode.ERROR
conn.set_prop(self.JOB, job)
conn.set_prop(ConnProps.SUBMITTER_NAME, job.meta.get(JobMetaKey.SUBMITTER_NAME, ""))
conn.set_prop(ConnProps.SUBMITTER_ORG, job.meta.get(JobMetaKey.SUBMITTER_ORG, ""))
conn.set_prop(ConnProps.SUBMITTER_ROLE, job.meta.get(JobMetaKey.SUBMITTER_ROLE, ""))
if len(args) > 2:
err = self.validate_command_targets(conn, args[2:])
if err:
conn.append_error(err)
return PreAuthzReturnCode.ERROR
return PreAuthzReturnCode.REQUIRE_AUTHZ
[docs] def abort_task(self, conn, args: List[str]) -> str:
engine = conn.app_ctx
if not isinstance(engine, ServerEngineInternalSpec):
raise TypeError("engine must be ServerEngineInternalSpec but got {}".format(type(engine)))
job_id = conn.get_prop(self.JOB_ID)
message = new_message(conn, topic=TrainingTopic.ABORT_TASK, body="", require_authz=False)
message.set_header(RequestHeader.JOB_ID, str(job_id))
replies = self.send_request_to_clients(conn, message)
return self.process_replies_to_table(conn, replies)
# Start App
def _start_app_on_server(self, conn: Connection, job_id: str) -> bool:
engine = conn.app_ctx
err = engine.start_app_on_server(job_id)
if err:
conn.append_error(err)
return False
else:
conn.append_string("Server app is starting....")
return True
def _start_app_on_clients(self, conn: Connection, job_id: str) -> bool:
engine = conn.app_ctx
err = engine.check_app_start_readiness(job_id)
if err:
conn.append_error(err)
return False
message = new_message(conn, topic=TrainingTopic.START, body="", require_authz=False)
message.set_header(RequestHeader.JOB_ID, job_id)
replies = self.send_request_to_clients(conn, message)
self.process_replies_to_table(conn, replies)
return True
[docs] def start_app(self, conn: Connection, args: List[str]):
engine = conn.app_ctx
if not isinstance(engine, ServerEngineInternalSpec):
raise TypeError("engine must be ServerEngineInternalSpec but got {}".format(type(engine)))
job_id = conn.get_prop(self.JOB_ID)
target_type = args[2]
if target_type == self.TARGET_TYPE_SERVER:
if not self._start_app_on_server(conn, job_id):
return
elif target_type == self.TARGET_TYPE_CLIENT:
if not self._start_app_on_clients(conn, job_id):
return
else:
# all
success = self._start_app_on_server(conn, job_id)
if success:
client_names = conn.get_prop(self.TARGET_CLIENT_NAMES, None)
if client_names:
if not self._start_app_on_clients(conn, job_id):
return
conn.append_success("")
[docs] def delete_job_id(self, conn: Connection, args: List[str]):
job_id = args[1]
engine = conn.app_ctx
if not isinstance(engine, ServerEngine):
raise TypeError("engine must be ServerEngine but got {}".format(type(engine)))
if job_id in engine.run_processes.keys():
conn.append_error(f"Current running run_{job_id} can not be deleted.")
return
err = engine.delete_job_id(job_id)
if err:
conn.append_error(err)
return
# ask clients to delete this RUN
message = new_message(conn, topic=TrainingTopic.DELETE_RUN, body="", require_authz=False)
message.set_header(RequestHeader.JOB_ID, str(job_id))
clients = engine.get_clients()
if clients:
conn.set_prop(self.TARGET_CLIENT_TOKENS, [x.token for x in clients])
replies = self.send_request_to_clients(conn, message)
self.process_replies_to_table(conn, replies)
conn.append_success("")
[docs] def list_jobs(self, conn: Connection, args: List[str]):
try:
parser = SafeArgumentParser(prog="list_jobs")
parser.add_argument("job_id", nargs="?", help="Job ID prefix")
parser.add_argument("-d", action="store_true", help="Show detailed list")
parser.add_argument("-n", help="Filter by job name prefix")
parsed_args = parser.parse_args(args[1:])
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
jobs = job_def_manager.get_all_jobs(fl_ctx)
if jobs:
id_prefix = parsed_args.job_id
name_prefix = parsed_args.n
filtered_jobs = [job for job in jobs if self._job_match(job.meta, id_prefix, name_prefix)]
if not filtered_jobs:
conn.append_error("No jobs matching the searching criteria")
return
filtered_jobs.sort(key=lambda job: job.meta.get(JobMetaKey.SUBMIT_TIME, 0.0))
if parsed_args.d:
self._send_detail_list(conn, filtered_jobs)
else:
self._send_summary_list(conn, filtered_jobs)
else:
conn.append_string("No jobs.")
except Exception as e:
conn.append_error(secure_format_exception(e))
return
conn.append_success("")
[docs] def delete_job(self, conn: Connection, args: List[str]):
job = conn.get_prop(self.JOB)
if not job:
conn.append_error("program error: job not set in conn")
return
job_id = conn.get_prop(self.JOB_ID)
if job.meta.get(JobMetaKey.STATUS, "") in [RunStatus.DISPATCHED.value, RunStatus.RUNNING.value]:
conn.append_error(f"job: {job_id} is running, could not be deleted at this time.")
return
try:
engine = conn.app_ctx
job_def_manager = engine.job_def_manager
with engine.new_context() as fl_ctx:
job_def_manager.delete(job_id, fl_ctx)
conn.append_string("Job {} deleted.".format(job_id))
except BaseException as e:
conn.append_error(f"exception occurred: {secure_format_exception(e)}")
return
conn.append_success("")
[docs] def abort_job(self, conn: Connection, args: List[str]):
engine = conn.app_ctx
job_runner = engine.job_runner
try:
job_id = conn.get_prop(self.JOB_ID)
with engine.new_context() as fl_ctx:
job_runner.stop_run(job_id, fl_ctx)
conn.append_string("Abort signal has been sent to the server app.")
conn.append_success("")
except Exception as e:
conn.append_error(f"Exception occurred trying to abort job: {secure_format_exception(e)}")
return
[docs] def clone_job(self, conn: Connection, args: List[str]):
job = conn.get_prop(self.JOB)
job_id = conn.get_prop(self.JOB_ID)
engine = conn.app_ctx
try:
if not isinstance(engine, ServerEngine):
raise TypeError(f"engine is not of type ServerEngine, but got {type(engine)}")
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
data_bytes = job_def_manager.get_content(job_id, fl_ctx)
job_meta = {str(k): job.meta[k] for k in job.meta.keys() & CLONED_META_KEYS}
# set the submitter info for the new job
job_meta[JobMetaKey.SUBMITTER_NAME.value] = conn.get_prop(ConnProps.USER_NAME)
job_meta[JobMetaKey.SUBMITTER_ORG.value] = conn.get_prop(ConnProps.USER_ORG)
job_meta[JobMetaKey.SUBMITTER_ROLE.value] = conn.get_prop(ConnProps.USER_ROLE)
job_meta[JobMetaKey.CLONED_FROM.value] = job_id
meta = job_def_manager.create(job_meta, data_bytes, fl_ctx)
conn.append_string("Cloned job {} as: {}".format(job_id, meta.get(JobMetaKey.JOB_ID)))
except Exception as e:
conn.append_error(f"Exception occurred trying to clone job: {secure_format_exception(e)}")
return
conn.append_success("")
[docs] def authorize_list_files(self, conn: Connection, args: List[str]):
if len(args) < 2:
conn.append_error("syntax error: missing job_id")
return False, None
if len(args) > 3:
conn.append_error("syntax error: too many arguments")
return False, None
return self.authorize_job(conn=conn, args=args[:2])
[docs] def list_files(self, conn: Connection, args: List[str]):
job_id = conn.get_prop(self.JOB_ID)
if len(args) == 2:
conn.append_string("job\nworkspace\n\nSpecify the job or workspace dir to see detailed contents.")
return
else:
file = args[2]
engine = conn.app_ctx
try:
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
job_data = job_def_manager.get_job_data(job_id, fl_ctx)
if file.startswith(TopDir.JOB):
file = file[len(TopDir.JOB) :]
file = file.lstrip("/")
data_bytes = job_data[JobDataKey.JOB_DATA.value]
ls_info = ls_zip_from_bytes(data_bytes)
elif file.startswith(TopDir.WORKSPACE):
file = file[len(TopDir.WORKSPACE) :]
file = file.lstrip("/")
workspace_bytes = job_data[JobDataKey.WORKSPACE_DATA.value]
ls_info = ls_zip_from_bytes(workspace_bytes)
else:
conn.append_error("syntax error: top level directory must be job or workspace")
return
return_string = "%-46s %19s %12s\n" % ("File Name", "Modified ", "Size")
for zinfo in ls_info:
date = "%d-%02d-%02d %02d:%02d:%02d" % zinfo.date_time[:6]
if zinfo.filename.startswith(file):
return_string += "%-46s %s %12d\n" % (zinfo.filename, date, zinfo.file_size)
conn.append_string(return_string)
except Exception as e:
secure_log_traceback()
conn.append_error(f"Exception occurred trying to get job from store: {secure_format_exception(e)}")
return
conn.append_success("")
@staticmethod
def _job_match(job_meta: Dict, id_prefix: str, name_prefix: str) -> bool:
return ((not id_prefix) or job_meta.get("job_id").lower().startswith(id_prefix.lower())) and (
(not name_prefix) or job_meta.get("name").lower().startswith(name_prefix.lower())
)
@staticmethod
def _send_detail_list(conn: Connection, jobs: List[Job]):
for job in jobs:
JobCommandModule._set_duration(job)
conn.append_string(json.dumps(job.meta, indent=4))
@staticmethod
def _send_summary_list(conn: Connection, jobs: List[Job]):
table = Table(["Job ID", "Name", "Status", "Submit Time", "Run Duration"])
for job in jobs:
JobCommandModule._set_duration(job)
table.add_row(
[
job.meta.get(JobMetaKey.JOB_ID, ""),
CommandUtil.get_job_name(job.meta),
job.meta.get(JobMetaKey.STATUS, ""),
job.meta.get(JobMetaKey.SUBMIT_TIME_ISO, ""),
str(job.meta.get(JobMetaKey.DURATION, "N/A")),
]
)
writer = io.StringIO()
table.write(writer)
conn.append_string(writer.getvalue())
@staticmethod
def _set_duration(job):
if job.meta.get(JobMetaKey.STATUS) == RunStatus.RUNNING.value:
start_time = datetime.datetime.strptime(job.meta.get(JobMetaKey.START_TIME), "%Y-%m-%d %H:%M:%S.%f")
duration = datetime.datetime.now() - start_time
job.meta[JobMetaKey.DURATION] = str(duration)
[docs] def submit_job(self, conn: Connection, args: List[str]):
folder_name = args[1]
zip_b64str = args[2]
data_bytes = convert_legacy_zipped_app_to_job(b64str_to_bytes(zip_b64str))
engine = conn.app_ctx
try:
with engine.new_context() as fl_ctx:
job_validator = JobMetaValidator()
valid, error, meta = job_validator.validate(folder_name, data_bytes)
if not valid:
conn.append_error(error)
return
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
# set submitter info
meta[JobMetaKey.SUBMITTER_NAME.value] = conn.get_prop(ConnProps.USER_NAME, "")
meta[JobMetaKey.SUBMITTER_ORG.value] = conn.get_prop(ConnProps.USER_ORG, "")
meta[JobMetaKey.SUBMITTER_ROLE.value] = conn.get_prop(ConnProps.USER_ROLE, "")
meta = job_def_manager.create(meta, data_bytes, fl_ctx)
conn.append_string("Submitted job: {}".format(meta.get(JobMetaKey.JOB_ID)))
except Exception as e:
conn.append_error(f"Exception occurred trying to submit job: {secure_format_exception(e)}")
return
conn.append_success("")
def _unzip_data(self, download_dir, job_data, job_id):
job_id_dir = os.path.join(download_dir, job_id)
if os.path.exists(job_id_dir):
shutil.rmtree(job_id_dir)
os.mkdir(job_id_dir)
data_bytes = job_data[JobDataKey.JOB_DATA.value]
job_dir = os.path.join(job_id_dir, "job")
os.mkdir(job_dir)
unzip_all_from_bytes(data_bytes, job_dir)
workspace_bytes = job_data[JobDataKey.WORKSPACE_DATA.value]
workspace_dir = os.path.join(job_id_dir, "workspace")
os.mkdir(workspace_dir)
if workspace_bytes is not None:
unzip_all_from_bytes(workspace_bytes, workspace_dir)
[docs] def download_job(self, conn: Connection, args: List[str]):
job_id = args[1]
download_dir = conn.get_prop(ConnProps.DOWNLOAD_DIR)
download_job_url = conn.get_prop(ConnProps.DOWNLOAD_JOB_URL)
engine = conn.app_ctx
try:
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
job_data = job_def_manager.get_job_data(job_id, fl_ctx)
size = get_size(job_data, seen=None)
if size > MAX_DOWNLOAD_JOB_SIZE:
conn.append_string(ftd.DOWNLOAD_URL_MARKER + download_job_url + job_id)
return
self._unzip_data(download_dir, job_data, job_id)
except Exception as e:
conn.append_error(f"Exception occurred trying to get job from store: {secure_format_exception(e)}")
return
try:
data = zip_directory_to_bytes(download_dir, job_id)
b64str = bytes_to_b64str(data)
conn.append_string(b64str)
except FileNotFoundError:
conn.append_error("No record found for job '{}'".format(job_id))
except BaseException:
secure_log_traceback()
conn.append_error("Exception occurred during attempt to zip data to send for job: {}".format(job_id))