Source code for nvflare.fuel.hci.server.authz

# Copyright (c) 2021-2022, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import logging
from typing import List

from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.reg import CommandEntry
from nvflare.fuel.sec.authz import AuthorizationService, AuthzContext, Person

from .constants import ConnProps
from .reg import CommandFilter

log = logging.getLogger(__name__)


[docs]class PreAuthzReturnCode(enum.Enum): OK = 0 # command preprocessed successfully, and no authz needed ERROR = 1 # error occurred in command processing REQUIRE_AUTHZ = 2 # command preprocessed successfully, further authz required
[docs]def command_handler_func_signature(conn: Connection, args: List[str]): pass
[docs]def command_authz_func_signature(conn: Connection, args: List[str]) -> PreAuthzReturnCode: pass
[docs]class AuthzFilter(CommandFilter): def __init__(self): """Filter for authorization of admin commands.""" CommandFilter.__init__(self)
[docs] def pre_command(self, conn: Connection, args: List[str]): cmd_entry = conn.get_prop(ConnProps.CMD_ENTRY, None) if not cmd_entry: return True assert isinstance(cmd_entry, CommandEntry) authz_func = cmd_entry.authz_func if not authz_func: return True return_code = authz_func(conn, args) if return_code == PreAuthzReturnCode.OK: return True if return_code == PreAuthzReturnCode.ERROR: return False # authz required - the command name is the name of the right to be checked! user = Person( name=conn.get_prop(ConnProps.USER_NAME, ""), org=conn.get_prop(ConnProps.USER_ORG, ""), role=conn.get_prop(ConnProps.USER_ROLE, ""), ) submitter = Person( name=conn.get_prop(ConnProps.SUBMITTER_NAME, ""), org=conn.get_prop(ConnProps.SUBMITTER_ORG, ""), role=conn.get_prop(ConnProps.SUBMITTER_ORG, ""), ) ctx = AuthzContext(user=user, submitter=submitter, right=cmd_entry.name) log.debug("User: {} Submitter: {} Right: {}".format(user, submitter, cmd_entry.name)) authorized, err = AuthorizationService.authorize(ctx) if err: conn.append_error("Authorization Error: {}".format(err)) return False if not authorized: conn.append_error("This action is not authorized") return False return True