Source code for nvflare.app_common.decomposers.common_decomposers

# Copyright (c) 2021-2022, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Decomposers for types from app_common and Machine Learning libraries."""
import os
from abc import ABC
from io import BytesIO
from typing import Any

import numpy as np

from nvflare.app_common.abstract.learnable import Learnable
from nvflare.app_common.widgets.event_recorder import _CtxPropReq, _EventReq, _EventStats
from nvflare.fuel.utils import fobs


[docs]class LearnableDecomposer(fobs.Decomposer):
[docs] @staticmethod def supported_type(): return Learnable
[docs] def decompose(self, target: Learnable) -> Any: return target.copy()
[docs] def recompose(self, data: Any) -> Learnable: obj = Learnable() for k, v in data.items(): obj[k] = v return obj
[docs]class NumpyScalarDecomposer(fobs.Decomposer, ABC): """Decomposer base class for all numpy types with item method."""
[docs] def decompose(self, target: Any) -> Any: return target.item()
[docs] def recompose(self, data: Any) -> np.ndarray: return self.supported_type()(data)
[docs]class Float64ScalarDecomposer(NumpyScalarDecomposer):
[docs] @staticmethod def supported_type(): return np.float64
[docs]class Float32ScalarDecomposer(NumpyScalarDecomposer):
[docs] @staticmethod def supported_type(): return np.float32
[docs]class Int64ScalarDecomposer(NumpyScalarDecomposer):
[docs] @staticmethod def supported_type(): return np.int64
[docs]class Int32ScalarDecomposer(NumpyScalarDecomposer):
[docs] @staticmethod def supported_type(): return np.int32
[docs]class NumpyArrayDecomposer(fobs.Decomposer):
[docs] @staticmethod def supported_type(): return np.ndarray
[docs] def decompose(self, target: np.ndarray) -> Any: stream = BytesIO() np.save(stream, target, allow_pickle=False) return stream.getvalue()
[docs] def recompose(self, data: Any) -> np.ndarray: stream = BytesIO(data) return np.load(stream, allow_pickle=False)
[docs]class CtxPropReqDecomposer(fobs.Decomposer):
[docs] @staticmethod def supported_type(): return _CtxPropReq
[docs] def decompose(self, target: _CtxPropReq) -> Any: return [target.dtype, target.is_private, target.is_sticky, target.allow_none]
[docs] def recompose(self, data: Any) -> _CtxPropReq: return _CtxPropReq(data[0], data[1], data[2], data[3])
[docs]class EventReqDecomposer(fobs.Decomposer):
[docs] @staticmethod def supported_type(): return _EventReq
[docs] def decompose(self, target: _EventReq) -> Any: return [target.ctx_reqs, target.peer_ctx_reqs, target.ctx_block_list, target.peer_ctx_block_List]
[docs] def recompose(self, data: Any) -> _EventReq: return _EventReq(data[0], data[1], data[2], data[3])
[docs]class EventStatsDecomposer(fobs.Decomposer):
[docs] @staticmethod def supported_type(): return _EventStats
[docs] def decompose(self, target: _EventStats) -> Any: return [ target.call_count, target.prop_missing, target.prop_none_value, target.prop_dtype_mismatch, target.prop_attr_mismatch, target.prop_block_list_violation, target.peer_ctx_missing, ]
[docs] def recompose(self, data: Any) -> _EventStats: stats = _EventStats() stats.call_count = data[0] stats.prop_missing = data[1] stats.prop_none_value = data[2] stats.prop_dtype_mismatch = data[3] stats.prop_attr_mismatch = data[4] stats.prop_block_list_violation = data[5] stats.peer_ctx_missing = data[6] return stats
[docs]def register(): if register.registered: return fobs.register_folder(os.path.dirname(__file__), __package__) register.registered = True
register.registered = False