# Copyright (c) Microsoft. All rights reserved. import os import tempfile from collections.abc import Callable from copy import deepcopy from dataclasses import dataclass from pathlib import Path from typing import Union from unittest.mock import AsyncMock, MagicMock, patch import pytest from semantic_kernel import Kernel from semantic_kernel.connectors.ai.chat_completion_client_base import ChatCompletionClientBase from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior from semantic_kernel.connectors.ai.open_ai.services.open_ai_chat_completion import OpenAIChatCompletion from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings from semantic_kernel.const import METADATA_EXCEPTION_KEY from semantic_kernel.contents import ChatMessageContent from semantic_kernel.contents.chat_history import ChatHistory from semantic_kernel.contents.function_call_content import FunctionCallContent from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent from semantic_kernel.exceptions import KernelFunctionAlreadyExistsError, KernelServiceNotFoundError from semantic_kernel.exceptions.content_exceptions import FunctionCallInvalidArgumentsException from semantic_kernel.exceptions.kernel_exceptions import ( KernelFunctionNotFoundError, KernelInvokeException, KernelPluginNotFoundError, OperationCancelledException, ) from semantic_kernel.exceptions.template_engine_exceptions import TemplateSyntaxError from semantic_kernel.filters.filter_types import FilterTypes from semantic_kernel.functions.function_result import FunctionResult from semantic_kernel.functions.kernel_arguments import KernelArguments from semantic_kernel.functions.kernel_function import KernelFunction from semantic_kernel.functions.kernel_function_decorator import kernel_function from semantic_kernel.functions.kernel_function_metadata import KernelFunctionMetadata from semantic_kernel.functions.kernel_parameter_metadata import KernelParameterMetadata from semantic_kernel.functions.kernel_plugin import KernelPlugin from semantic_kernel.prompt_template.kernel_prompt_template import KernelPromptTemplate from semantic_kernel.prompt_template.prompt_template_config import PromptTemplateConfig from semantic_kernel.services.ai_service_client_base import AIServiceClientBase from semantic_kernel.services.ai_service_selector import AIServiceSelector # region Init def test_init(): kernel = Kernel() assert kernel is not None assert kernel.ai_service_selector is not None assert kernel.plugins is not None assert kernel.services is not None assert kernel.retry_mechanism is not None assert kernel.function_invocation_filters is not None assert kernel.prompt_rendering_filters is not None def test_kernel_init_with_ai_service_selector(): ai_service_selector = AIServiceSelector() kernel = Kernel(ai_service_selector=ai_service_selector) assert kernel.ai_service_selector is not None def test_kernel_init_with_services(service: AIServiceClientBase): kernel = Kernel(services=service) assert kernel.services is not None assert kernel.services["service"] is not None def test_kernel_init_with_services_dict(service: AIServiceClientBase): kernel = Kernel(services={"service": service}) assert kernel.services is not None assert kernel.services["service"] is not None def test_kernel_init_with_services_list(service: AIServiceClientBase): kernel = Kernel(services=[service]) assert kernel.services is not None assert kernel.services["service"] is not None def test_kernel_init_with_plugins(): plugins = {"plugin": KernelPlugin(name="plugin")} kernel = Kernel(plugins=plugins) assert kernel.plugins is not None def test_kernel_init_with_kernel_plugin_instance(): plugin = KernelPlugin(name="plugin") kernel = Kernel(plugins=plugin) assert kernel.plugins is not None def test_kernel_init_with_kernel_plugin_list(): plugin = [KernelPlugin(name="plugin")] kernel = Kernel(plugins=plugin) assert kernel.plugins is not None # endregion # region Invoke Functions async def test_invoke_function(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") await kernel.invoke(mock_function, KernelArguments()) assert mock_function.call_count == 1 async def test_invoke_function_with_cancellation(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") with ( patch.object(Kernel, "invoke", AsyncMock(side_effect=OperationCancelledException("Operation cancelled"))), pytest.raises(OperationCancelledException), ): result = await kernel.invoke(function=mock_function, arguments=KernelArguments()) assert result is None Kernel.invoke.assert_called_once_with(function=mock_function, arguments=KernelArguments()) async def test_invoke_functions_by_name(kernel: Kernel, create_mock_function): mock_function = kernel.add_function(plugin_name="test", function=create_mock_function(name="test_function")) result = await kernel.invoke(function_name="test_function", plugin_name="test", arguments=KernelArguments()) assert str(result) == "test" assert mock_function.call_count == 1 async for response in kernel.invoke_stream(function_name="test_function", plugin_name="test"): assert response[0].text == "test" async def test_invoke_functions_by_name_return_function_results(kernel: Kernel, create_mock_function): mock_function = kernel.add_function(plugin_name="test", function=create_mock_function(name="test_function")) result = await kernel.invoke(function_name="test_function", plugin_name="test", arguments=KernelArguments()) assert str(result) == "test" assert mock_function.call_count == 1 async for _ in kernel.invoke_stream( function_name="test_function", plugin_name="test", return_function_results=True ): assert isinstance(result, FunctionResult) async def test_invoke_function_fail(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") kernel.add_plugin(KernelPlugin(name="test", functions=[mock_function])) with pytest.raises(KernelFunctionNotFoundError): await kernel.invoke(arguments=KernelArguments()) with pytest.raises(KernelFunctionNotFoundError): async for _ in kernel.invoke_stream(arguments=KernelArguments()): pass async def test_invoke_function_cancelled(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") mock_function._invoke_internal = AsyncMock(side_effect=OperationCancelledException("Operation cancelled")) kernel.add_plugin(KernelPlugin(name="test", functions=[mock_function])) result = await kernel.invoke(mock_function, arguments=KernelArguments()) assert result is None async def test_invoke_stream_function(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") mock_function = kernel.add_function(plugin_name="test", function=mock_function) async for part in kernel.invoke_stream(mock_function, input="test"): assert part[0].text == "test" assert mock_function.call_count == 1 async def test_invoke_stream_functions_throws_exception(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") kernel.add_plugin(KernelPlugin(name="test", functions=[mock_function])) functions = [mock_function] function_result_with_exception = FunctionResult( value="", function=mock_function.metadata, output=None, metadata={METADATA_EXCEPTION_KEY: "Test Exception"} ) with patch("semantic_kernel.kernel.Kernel.invoke_stream", return_value=AsyncMock()) as mocked_invoke_stream: mocked_invoke_stream.return_value.__aiter__.return_value = [function_result_with_exception] async for part in kernel.invoke_stream(functions, input="test"): assert METADATA_EXCEPTION_KEY in part.metadata, "Expected exception metadata in the FunctionResult." assert part.metadata[METADATA_EXCEPTION_KEY] == "Test Exception", "The exception message does not match." break async def test_invoke_prompt(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") with patch( "semantic_kernel.functions.kernel_function_from_prompt.KernelFunctionFromPrompt._invoke_internal", return_value=FunctionResult(function=mock_function.metadata, value="test"), ) as mock_invoke: await kernel.invoke_prompt(prompt="test", plugin_name="test", function_name="test", arguments=KernelArguments()) mock_invoke.invoke.call_count == 1 async def test_invoke_prompt_no_prompt_error(kernel: Kernel): with pytest.raises(TemplateSyntaxError): await kernel.invoke_prompt( function_name="test_function", plugin_name="test_plugin", prompt="", ) async def test_invoke_prompt_stream_no_prompt_throws(kernel: Kernel): with pytest.raises(TemplateSyntaxError): async for _ in kernel.invoke_prompt_stream(prompt=""): pass async def test_invoke_prompt_stream(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") with ( patch( "semantic_kernel.kernel.Kernel.invoke_stream", ) as mock_stream_invoke, ): mock_stream_invoke.return_value.__aiter__.return_value = [ FunctionResult(function=mock_function.metadata, value="test") ] async for response in kernel.invoke_prompt_stream(prompt="test"): assert response.value == "test" async def test_invoke_prompt_stream_returns_function_results(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") with ( patch( "semantic_kernel.kernel.Kernel.invoke_stream", ) as mock_stream_invoke, ): mock_stream_invoke.return_value.__aiter__.return_value = [ FunctionResult(function=mock_function.metadata, value="test") ] async for response in kernel.invoke_prompt_stream(prompt="test", return_function_results=True): assert isinstance(response, FunctionResult) async def test_invoke_prompt_stream_raises_exception(kernel: Kernel, create_mock_function): mock_function = create_mock_function(name="test_function") with ( patch( "semantic_kernel.kernel.Kernel.invoke_stream", ) as mock_stream_invoke, pytest.raises(KernelInvokeException), ): mock_stream_invoke.return_value.__aiter__.return_value = [ FunctionResult( function=mock_function.metadata, value="", metadata={METADATA_EXCEPTION_KEY: KernelInvokeException()} ) ] async for _ in kernel.invoke_prompt_stream(prompt="test"): pass async def test_invoke_function_call(kernel: Kernel, get_tool_call_mock): tool_call_mock = get_tool_call_mock result_mock = MagicMock(spec=ChatMessageContent) result_mock.items = [tool_call_mock] chat_history_mock = MagicMock(spec=ChatHistory) func_mock = AsyncMock(spec=KernelFunction) func_meta = KernelFunctionMetadata(name="function", is_prompt=False) func_mock.metadata = func_meta func_mock.name = "function" func_result = FunctionResult(value="Function result", function=func_meta) func_mock.invoke = MagicMock(return_value=func_result) arguments = KernelArguments() with ( patch("semantic_kernel.kernel.logger", autospec=True), patch("semantic_kernel.kernel.Kernel.get_list_of_function_metadata", return_value=[func_meta]), ): await kernel.invoke_function_call( function_call=tool_call_mock, chat_history=chat_history_mock, arguments=arguments, function_call_count=1, request_index=0, function_behavior=FunctionChoiceBehavior.Auto(filters={"included_functions": ["function"]}), ) class LightsPlugin: lights = [ {"id": 1, "name": "Table Lamp", "is_on": False}, {"id": 2, "name": "Porch light", "is_on": False}, {"id": 3, "name": "Chandelier", "is_on": False}, ] @kernel_function( name="get_lights_states", description="Gets a list of lights and their current state", ) def get_lights_states(self) -> list[dict]: """Return a list of lights and their states (read-only).""" return self.lights # returns by reference, but deep-copy fix will snapshot it @pytest.mark.asyncio async def test_kernel_invoke_deep_copy_preserves_previous_state(): kernel = Kernel() plugin = LightsPlugin() kernel.add_plugin(plugin, plugin_name="lights") func_call = FunctionCallContent( plugin_name="lights", function_name="get_lights_states", name="get_lights_states", arguments={} ) chat_history = ChatHistory() # 1st invocation: capture snapshot of the list _ = await kernel.invoke_function_call( function_call=func_call, chat_history=chat_history, arguments=None, function_call_count=1, request_index=0, function_behavior=FunctionChoiceBehavior.Auto(), ) snapshot1 = chat_history.messages[-1].items[0].result # Mutate the plugin's internal state after the first call plugin.lights.append({"id": 4, "name": "Desk lamp", "is_on": True}) # The snapshot from the first invocation did not change assert snapshot1 == [ {"id": 1, "name": "Table Lamp", "is_on": False}, {"id": 2, "name": "Porch light", "is_on": False}, {"id": 3, "name": "Chandelier", "is_on": False}, ] # 2nd invocation: should pick up the new item _ = await kernel.invoke_function_call( function_call=func_call, chat_history=chat_history, arguments=None, function_call_count=1, request_index=1, function_behavior=FunctionChoiceBehavior.Auto(), ) snapshot2 = chat_history.messages[-1].items[0].result # The new invoke reflects the mutated state assert snapshot2[-1] == {"id": 4, "name": "Desk lamp", "is_on": True} # region Clone safety with unpickleable async context class _AsyncGenPlugin: """Plugin holding an async generator to emulate MCP-style async internals. Deep-copying objects that reference async generators typically fails with `TypeError: cannot pickle 'async_generator' object`. """ def __init__(self): async def _agen(): yield "tick" # Store an async generator object on the instance to make it unpickleable. self._unpickleable_async_gen = _agen() @kernel_function(name="do", description="Return OK to validate plugin wiring") async def do(self) -> str: return "ok" @pytest.mark.asyncio async def test_kernel_clone_with_unpickleable_plugin_does_not_raise(): kernel = Kernel() plugin_instance = _AsyncGenPlugin() kernel.add_plugin(plugin_instance) # Sanity: naive deepcopy of plugins should raise due to async generator state with pytest.raises(TypeError): deepcopy(kernel.plugins) # Clone should succeed and preserve function usability cloned = kernel.clone() func = cloned.get_function(plugin_instance.__class__.__name__, "do") result = await func.invoke(cloned) assert result is not None assert result.value == "ok" # endregion async def test_invoke_function_call_throws_during_invoke(kernel: Kernel, get_tool_call_mock): tool_call_mock = get_tool_call_mock result_mock = MagicMock(spec=ChatMessageContent) result_mock.items = [tool_call_mock] chat_history_mock = MagicMock(spec=ChatHistory) chat_history_mock.messages = [MagicMock(spec=StreamingChatMessageContent)] func_mock = AsyncMock(spec=KernelFunction) func_meta = KernelFunctionMetadata(name="function", is_prompt=False) func_mock.metadata = func_meta func_mock.name = "function" func_result = FunctionResult(value="Function result", function=func_meta) func_mock.invoke = MagicMock(return_value=func_result) arguments = KernelArguments() with ( patch("semantic_kernel.kernel.logger", autospec=True), patch("semantic_kernel.kernel.Kernel.get_list_of_function_metadata", return_value=[func_meta]), patch("semantic_kernel.kernel.Kernel.get_function", return_value=func_mock), ): await kernel.invoke_function_call( function_call=tool_call_mock, chat_history=chat_history_mock, arguments=arguments, function_call_count=1, request_index=0, function_behavior=FunctionChoiceBehavior.Auto(), ) async def test_invoke_function_call_non_allowed_func_throws(kernel: Kernel, get_tool_call_mock): tool_call_mock = get_tool_call_mock result_mock = MagicMock(spec=ChatMessageContent) result_mock.items = [tool_call_mock] chat_history_mock = MagicMock(spec=ChatHistory) func_mock = AsyncMock(spec=KernelFunction) func_meta = KernelFunctionMetadata(name="function", is_prompt=False) func_mock.metadata = func_meta func_mock.name = "function" func_result = FunctionResult(value="Function result", function=func_meta) func_mock.invoke = MagicMock(return_value=func_result) arguments = KernelArguments() with patch("semantic_kernel.kernel.logger", autospec=True): await kernel.invoke_function_call( function_call=tool_call_mock, chat_history=chat_history_mock, arguments=arguments, function_call_count=1, request_index=0, function_behavior=FunctionChoiceBehavior.Auto(filters={"included_functions": ["unknown"]}), ) async def test_invoke_function_call_no_name_throws(kernel: Kernel, get_tool_call_mock): tool_call_mock = get_tool_call_mock tool_call_mock.name = None result_mock = MagicMock(spec=ChatMessageContent) result_mock.items = [tool_call_mock] chat_history_mock = MagicMock(spec=ChatHistory) func_mock = AsyncMock(spec=KernelFunction) func_meta = KernelFunctionMetadata(name="function", is_prompt=False) func_mock.metadata = func_meta func_mock.name = "function" func_result = FunctionResult(value="Function result", function=func_meta) func_mock.invoke = MagicMock(return_value=func_result) arguments = KernelArguments() with ( patch("semantic_kernel.kernel.logger", autospec=True), ): await kernel.invoke_function_call( function_call=tool_call_mock, chat_history=chat_history_mock, arguments=arguments, function_call_count=1, request_index=0, function_behavior=FunctionChoiceBehavior.Auto(), ) async def test_invoke_function_call_not_enough_parsed_args(kernel: Kernel, get_tool_call_mock): tool_call_mock = get_tool_call_mock tool_call_mock.to_kernel_arguments.return_value = {} result_mock = MagicMock(spec=ChatMessageContent) result_mock.items = [tool_call_mock] chat_history_mock = MagicMock(spec=ChatHistory) func_mock = AsyncMock(spec=KernelFunction) func_meta = KernelFunctionMetadata(name="function", is_prompt=False) func_mock.metadata = func_meta func_mock.name = "function" func_mock.parameters = [KernelParameterMetadata(name="param1", is_required=True)] func_result = FunctionResult(value="Function result", function=func_meta) func_mock.invoke = MagicMock(return_value=func_result) arguments = KernelArguments() with ( patch("semantic_kernel.kernel.logger", autospec=True), patch("semantic_kernel.kernel.Kernel.get_function", return_value=func_mock), ): await kernel.invoke_function_call( function_call=tool_call_mock, chat_history=chat_history_mock, arguments=arguments, function_call_count=1, request_index=0, function_behavior=FunctionChoiceBehavior.Auto(), ) async def test_invoke_function_call_with_continuation_on_malformed_arguments(kernel: Kernel, get_tool_call_mock): tool_call_mock = MagicMock(spec=FunctionCallContent) tool_call_mock.to_kernel_arguments.side_effect = FunctionCallInvalidArgumentsException("Malformed arguments") tool_call_mock.name = "test-function" tool_call_mock.function_name = "function" tool_call_mock.plugin_name = "test" tool_call_mock.arguments = {"arg_name": "arg_value"} tool_call_mock.ai_model_id = None tool_call_mock.metadata = {} tool_call_mock.index = 0 tool_call_mock.to_kernel_arguments.return_value = {"arg_name": "arg_value"} tool_call_mock.id = "test_id" result_mock = MagicMock(spec=ChatMessageContent) result_mock.items = [tool_call_mock] chat_history_mock = MagicMock(spec=ChatHistory) func_mock = MagicMock(spec=KernelFunction) func_meta = KernelFunctionMetadata(name="function", is_prompt=False) func_mock.metadata = func_meta func_mock.name = "function" func_result = FunctionResult(value="Function result", function=func_meta) func_mock.invoke = AsyncMock(return_value=func_result) arguments = KernelArguments() with ( patch("semantic_kernel.kernel.logger", autospec=True) as logger_mock, patch("semantic_kernel.kernel.Kernel.get_function", return_value=func_mock), ): await kernel.invoke_function_call( function_call=tool_call_mock, chat_history=chat_history_mock, arguments=arguments, function_call_count=1, request_index=0, function_behavior=FunctionChoiceBehavior.Auto(), ) logger_mock.info.assert_any_call( "Received invalid arguments for function test-function: Malformed arguments. Trying tool call again." ) add_message_calls = chat_history_mock.add_message.call_args_list assert any( call[1]["message"].items[0].result == "The tool call arguments are malformed. Arguments must be in JSON format. Please try again." # noqa: E501 and call[1]["message"].items[0].id == "test_id" and call[1]["message"].items[0].name == "test-function" for call in add_message_calls ), "Expected call to add_message not found with the expected message content and metadata." async def test_invoke_function_call_with_missing_or_unexpected_args(kernel: Kernel): tool_call_mock = MagicMock(spec=FunctionCallContent) tool_call_mock.to_kernel_arguments.return_value = {"unexpected_arg": "value"} tool_call_mock.name = "test-function" tool_call_mock.function_name = "function" tool_call_mock.plugin_name = "test" tool_call_mock.arguments = {"unexpected_arg": "value"} tool_call_mock.ai_model_id = None tool_call_mock.metadata = {} tool_call_mock.index = 0 tool_call_mock.id = "test_id" chat_history_mock = MagicMock(spec=ChatHistory) # One required parameter called "required_arg" required_param = KernelParameterMetadata(name="required_arg", is_required=True) func_meta = KernelFunctionMetadata(name="function", is_prompt=False, parameters=[required_param]) func_mock = MagicMock(spec=KernelFunction) func_mock.metadata = func_meta func_mock.name = "function" func_mock.parameters = [required_param] with ( patch("semantic_kernel.kernel.logger", autospec=True) as logger_mock, patch("semantic_kernel.kernel.Kernel.get_function", return_value=func_mock), ): result = await kernel.invoke_function_call( function_call=tool_call_mock, chat_history=chat_history_mock, arguments=KernelArguments(), function_call_count=1, request_index=0, function_behavior=FunctionChoiceBehavior.Auto(), ) assert result is None logger_msg = ( "Missing required argument(s): ['required_arg']. " "Received unexpected argument(s): ['unexpected_arg']. " "Please revise the arguments to match the function signature." ) logger_mock.info.assert_called_with(logger_msg) add_message_calls = chat_history_mock.add_message.call_args_list assert any( call[1]["message"].items[0].result.startswith("Missing required argument") and call[1]["message"].items[0].id == "test_id" for call in add_message_calls ), "Expected fallback message not found in chat history." # endregion # region Plugins def test_add_plugin_from_directory(kernel: Kernel): # import plugins plugins_directory = os.path.join(os.path.dirname(__file__), "../../assets", "test_plugins") # path to plugins directory plugin = kernel.add_plugin(plugin_name="TestPlugin", parent_directory=plugins_directory) assert plugin is not None assert len(plugin.functions) == 2 func = plugin.functions["TestFunction"] assert func is not None func_handlebars = plugin.functions["TestFunctionHandlebars"] assert func_handlebars is not None def test_add_plugin_from_directory_with_encoding(kernel: Kernel): """Test kernel.add_plugin with custom encoding parameter.""" with tempfile.TemporaryDirectory() as temp_dir: # Create a plugin directory with UTF-8 content plugin_dir = os.path.join(temp_dir, "test_encoding_plugin") os.makedirs(plugin_dir) function_dir = os.path.join(plugin_dir, "test_function") os.makedirs(function_dir) prompt_path = os.path.join(function_dir, "skprompt.txt") config_path = os.path.join(function_dir, "config.json") # UTF-8 content with international characters # Hello World Test prompt_content = """Multi-language assistant: Chinese: 你好世界! Japanese: こんにちは世界! Question: {{$input}} """ config_content = """{ "schema": 1, "description": "Test encoding function", "input_variables": [ { "name": "input", "description": "User's question", "required": true } ] }""" # Write files with UTF-8 encoding with open(prompt_path, "w", encoding="utf-8") as f: f.write(prompt_content) with open(config_path, "w", encoding="utf-8") as f: f.write(config_content) # Test with explicit encoding plugin = kernel.add_plugin(parent_directory=temp_dir, plugin_name="test_encoding_plugin", encoding="utf-8") assert plugin is not None assert plugin.name == "test_encoding_plugin" assert "test_function" in plugin.functions function = plugin.functions["test_function"] template = function.prompt_template.prompt_template_config.template # Assert "Hello World" assert "你好世界" in template assert "こんにちは世界" in template assert function.description == "Test encoding function" def test_plugin_no_plugin(kernel: Kernel): with pytest.raises(ValueError): kernel.add_plugin(plugin_name="test") def test_plugin_name_from_class_name(kernel: Kernel): kernel.add_plugin(" ", None) assert "str" in kernel.plugins def test_plugin_name_from_name_attribute(kernel: Kernel): @dataclass class TestPlugin: name: str = "test_plugin" kernel.add_plugin(TestPlugin(), None) assert "test_plugin" in kernel.plugins def test_plugin_name_not_string_error(kernel: Kernel): with pytest.raises(TypeError): kernel.add_plugin(" ", plugin_name=Path(__file__).parent) def test_plugins_add_plugins(kernel: Kernel): plugin1 = KernelPlugin(name="TestPlugin") plugin2 = KernelPlugin(name="TestPlugin2") kernel.add_plugins([plugin1, plugin2]) assert len(kernel.plugins) == 2 def test_add_function_from_prompt(kernel: Kernel): prompt = """ Write a short story about two Corgis on an adventure. The story must be: - G rated - Have a positive message - No sexism, racism or other bias/bigotry - Be exactly {{$paragraph_count}} paragraphs long - Be written in this language: {{$language}} - The two names of the corgis are {{GenerateNames.generate_names}} """ kernel.add_function( prompt=prompt, function_name="TestFunction", plugin_name="TestPlugin", description="Write a short story.", execution_settings=PromptExecutionSettings( extension_data={"max_tokens": 500, "temperature": 0.5, "top_p": 0.5} ), ) func = kernel.get_function("TestPlugin", "TestFunction") assert func.name == "TestFunction" assert func.description == "Write a short story." assert len(func.parameters) == 2 def test_add_function_not_provided(kernel: Kernel): with pytest.raises(ValueError): kernel.add_function(function_name="TestFunction", plugin_name="TestPlugin") def test_add_function_from_prompt_different_values(kernel: Kernel): template = """ Write a short story about two Corgis on an adventure. The story must be: - G rated - Have a positive message - No sexism, racism or other bias/bigotry - Be exactly {{$paragraph_count}} paragraphs long - Be written in this language: {{$language}} - The two names of the corgis are {{GenerateNames.generate_names}} """ prompt = "test" kernel.add_function( prompt=prompt, function_name="TestFunction", plugin_name="TestPlugin", description="Write a short story.", template_format="handlebars", prompt_template_config=PromptTemplateConfig( template=template, ), execution_settings=PromptExecutionSettings( extension_data={"max_tokens": 500, "temperature": 0.5, "top_p": 0.5} ), ) func = kernel.get_function("TestPlugin", "TestFunction") assert func.name == "TestFunction" assert func.description == "Write a short story." assert isinstance(func.prompt_template, KernelPromptTemplate) assert len(func.parameters) == 2 def test_add_functions(kernel: Kernel): @kernel_function(name="func1") def func1(arg1: str) -> str: return "test" @kernel_function(name="func2") def func2(arg1: str) -> str: return "test" plugin = kernel.add_functions(plugin_name="test", functions=[func1, func2]) assert len(plugin.functions) == 2 def test_add_functions_to_existing(kernel: Kernel): kernel.add_plugin(KernelPlugin(name="test")) @kernel_function(name="func1") def func1(arg1: str) -> str: return "test" @kernel_function(name="func2") def func2(arg1: str) -> str: return "test" plugin = kernel.add_functions(plugin_name="test", functions=[func1, func2]) assert len(plugin.functions) == 2 def test_import_plugin_from_openapi(kernel: Kernel): openapi_spec_file = os.path.join( os.path.dirname(__file__), "../../assets/test_plugins", "TestOpenAPIPlugin", "akv-openapi.yaml" ) kernel.add_plugin_from_openapi( plugin_name="TestOpenAPIPlugin", openapi_document_path=openapi_spec_file, ) plugin = kernel.get_plugin(plugin_name="TestOpenAPIPlugin") assert plugin is not None assert plugin.name == "TestOpenAPIPlugin" assert plugin.functions.get("GetSecret") is not None assert plugin.functions.get("SetSecret") is not None def test_get_plugin(kernel: Kernel): kernel.add_plugin(KernelPlugin(name="TestPlugin")) plugin = kernel.get_plugin("TestPlugin") assert plugin is not None def test_get_plugin_not_found(kernel: Kernel): with pytest.raises(KernelPluginNotFoundError): kernel.get_plugin("TestPlugin2") def test_get_function(kernel: Kernel, custom_plugin_class): kernel.add_plugin(custom_plugin_class(), "TestPlugin") func = kernel.get_function("TestPlugin", "getLightStatus") assert func def test_func_plugin_not_found(kernel: Kernel): with pytest.raises(KernelPluginNotFoundError): kernel.get_function("TestPlugin", "TestFunction") def test_func_function_not_found(kernel: Kernel, custom_plugin_class): kernel.add_plugin(custom_plugin_class(), "TestPlugin") with pytest.raises(KernelFunctionNotFoundError): kernel.get_function("TestPlugin", "TestFunction") def test_get_function_from_fqn(kernel: Kernel, custom_plugin_class): kernel.add_plugin(custom_plugin_class(), "TestPlugin") func = kernel.get_function_from_fully_qualified_function_name("TestPlugin-getLightStatus") assert func def test_get_function_from_fqn_wo_plugin(kernel: Kernel, custom_plugin_class): kernel.add_plugin(custom_plugin_class(), "TestPlugin") func = kernel.get_function_from_fully_qualified_function_name("getLightStatus") assert func # endregion # region Services def test_kernel_add_service(kernel: Kernel, service: AIServiceClientBase): kernel.add_service(service) assert kernel.services == {"service": service} def test_kernel_add_service_twice(kernel_with_service: Kernel, service: AIServiceClientBase): with pytest.raises(KernelFunctionAlreadyExistsError): kernel_with_service.add_service(service) assert kernel_with_service.services == {"service": service} def test_kernel_add_multiple_services(kernel_with_service: Kernel, service: AIServiceClientBase): service2 = AIServiceClientBase(service_id="service2", ai_model_id="ai_model_id") kernel_with_service.add_service(service2) assert kernel_with_service.services["service2"] == service2 assert len(kernel_with_service.services) == 2 def test_kernel_remove_service(kernel_with_service: Kernel): kernel_with_service.remove_service("service") assert kernel_with_service.services == {} def test_kernel_remove_service_error(kernel_with_service: Kernel): with pytest.raises(KernelServiceNotFoundError): kernel_with_service.remove_service("service2") def test_kernel_remove_all_service(kernel_with_service: Kernel): kernel_with_service.remove_all_services() assert kernel_with_service.services == {} def test_get_default_service(kernel_with_default_service: Kernel): service_get = kernel_with_default_service.get_service() assert service_get == kernel_with_default_service.services["default"] def test_get_default_service_with_type(kernel_with_default_service: Kernel): service_get = kernel_with_default_service.get_service(type=AIServiceClientBase) assert service_get == kernel_with_default_service.services["default"] def test_get_service(kernel_with_service: Kernel): service_get = kernel_with_service.get_service("service") assert service_get == kernel_with_service.services["service"] def test_get_service_by_type(kernel_with_service: Kernel): service_get = kernel_with_service.get_service(type=AIServiceClientBase) assert service_get == kernel_with_service.services["service"] def test_get_service_by_type_not_found(kernel_with_service: Kernel): with pytest.raises(KernelServiceNotFoundError): kernel_with_service.get_service(type=ChatCompletionClientBase) def test_get_default_service_by_type(kernel_with_default_service: Kernel): service_get = kernel_with_default_service.get_services_by_type(AIServiceClientBase) assert service_get["default"] == kernel_with_default_service.services["default"] def test_get_services_by_type(kernel_with_service: Kernel): service_get = kernel_with_service.get_services_by_type(AIServiceClientBase) assert service_get["service"] == kernel_with_service.services["service"] def test_get_service_with_id_not_found(kernel_with_service: Kernel): with pytest.raises(KernelServiceNotFoundError): kernel_with_service.get_service("service2", type=AIServiceClientBase) def test_get_service_with_type(kernel_with_service: Kernel): service_get = kernel_with_service.get_service("service", type=AIServiceClientBase) assert service_get == kernel_with_service.services["service"] def test_get_service_with_multiple_types(kernel_with_service: Kernel): service_get = kernel_with_service.get_service("service", type=(AIServiceClientBase, ChatCompletionClientBase)) assert service_get == kernel_with_service.services["service"] def test_get_service_with_multiple_types_union(kernel_with_service: Kernel): """This is valid syntax only in python 3.10+. It is skipped for older versions.""" service_get = kernel_with_service.get_service("service", type=Union[AIServiceClientBase, ChatCompletionClientBase]) assert service_get == kernel_with_service.services["service"] def test_get_service_with_type_not_found(kernel_with_service: Kernel): with pytest.raises(KernelServiceNotFoundError): kernel_with_service.get_service("service", type=ChatCompletionClientBase) def test_get_service_no_id(kernel_with_service: Kernel): service_get = kernel_with_service.get_service() assert service_get == kernel_with_service.services["service"] def test_instantiate_prompt_execution_settings_through_kernel(kernel_with_service: Kernel): settings = kernel_with_service.get_prompt_execution_settings_from_service_id("service") assert settings.service_id == "service" # endregion # region experimental class decorator def test_experimental_class_has_decorator_and_flag(experimental_plugin_class): assert hasattr(experimental_plugin_class, "is_experimental") assert experimental_plugin_class.is_experimental assert "This class is marked as 'experimental' and may change in the future" in experimental_plugin_class.__doc__ # endregion # region copy and clone def test_kernel_model_dump( kernel: Kernel, custom_plugin_class: type, auto_function_invocation_filter: Callable, ): kernel.add_plugin(custom_plugin_class(), "TestPlugin") kernel.add_filter(FilterTypes.AUTO_FUNCTION_INVOCATION, auto_function_invocation_filter) kernel_dict = kernel.model_dump() assert kernel_dict is not None assert kernel_dict["plugins"] is not None and len(kernel_dict["plugins"]) > 0 assert ( kernel_dict["auto_function_invocation_filters"] is not None and len(kernel_dict["auto_function_invocation_filters"]) > 0 ) def test_kernel_deep_copy( kernel: Kernel, custom_plugin_class: type, auto_function_invocation_filter: Callable, ): kernel.add_plugin(custom_plugin_class(), "TestPlugin") kernel.add_filter(FilterTypes.AUTO_FUNCTION_INVOCATION, auto_function_invocation_filter) kernel_copy = kernel.model_copy(deep=True) assert kernel_copy is not None assert kernel_copy.plugins is not None and len(kernel_copy.plugins) > 0 assert ( kernel_copy.auto_function_invocation_filters is not None and len(kernel_copy.auto_function_invocation_filters) > 0 ) def test_kernel_model_dump_fail_with_services(kernel: Kernel): open_ai_chat_completion = OpenAIChatCompletion(ai_model_id="abc", api_key="abc") kernel.add_service(open_ai_chat_completion) with pytest.raises(TypeError): # This will fail because OpenAIChatCompletion is not serializable, more specifically, # the client is not serializable kernel.model_dump(deep=True) def test_kernel_deep_copy_fail_with_services(kernel: Kernel): open_ai_chat_completion = OpenAIChatCompletion(ai_model_id="abc", api_key="abc") kernel.add_service(open_ai_chat_completion) with pytest.raises(TypeError): # This will fail because OpenAIChatCompletion is not serializable, more specifically, # the client is not serializable kernel.model_copy(deep=True) def test_kernel_clone( kernel: Kernel, custom_plugin_class: type, auto_function_invocation_filter: Callable, ): kernel.add_service(OpenAIChatCompletion(ai_model_id="abc", api_key="abc")) kernel.add_plugin(custom_plugin_class(), "TestPlugin") kernel.add_filter(FilterTypes.AUTO_FUNCTION_INVOCATION, auto_function_invocation_filter) kernel_clone = kernel.clone() # Assert the clone has all the same properties as the original kernel assert kernel_clone is not None assert kernel_clone.plugins is not None and len(kernel_clone.plugins) > 0 assert ( kernel_clone.auto_function_invocation_filters is not None and len(kernel_clone.auto_function_invocation_filters) > 0 ) assert kernel_clone.services is not None and len(kernel_clone.services) > 0 # Assert the clone is a deep copy kernel_clone.plugins["TestPlugin"].functions["getLightStatus"].metadata.name = "getLightStatus2" assert kernel.plugins["TestPlugin"].functions["getLightStatus"].metadata.name == "getLightStatus" kernel_clone.plugins.clear() kernel_clone.remove_filter(filter_type=FilterTypes.AUTO_FUNCTION_INVOCATION, position=0) kernel_clone.remove_all_services() assert kernel.plugins is not None and len(kernel.plugins) > 0 assert kernel.auto_function_invocation_filters is not None and len(kernel.auto_function_invocation_filters) > 0 assert kernel.services is not None and len(kernel.services) > 0 # endregion