Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
|
|
|
|
|
|
import contextlib
|
|
|
|
|
import inspect
|
Python: Allow for factory callbacks in the process framework (#10451)
### Motivation and Context
In the current Python process framework, in runtimes like Dapr, there is
no easy way to pass complex (unserializable) dependencies to a step --
this includes things like a ChatCompletion service or an agent of type
ChatCompletionAgent.
Similar to how the kernel dependency was propagated to the step_actor or
process_actor, we're introducing the ability to specify a factory
callback that will be called as the step is instantiated. The factory is
created, if specified via the optional kwarg when adding a step to the
process builder like:
```python
myBStep = process.add_step(step_type=BStep, factory_function=bstep_factory)
```
The `bstep_factory` looks like (along with its corresponding step)
```python
async def bstep_factory():
"""Creates a BStep instance with ephemeral references like ChatCompletionAgent."""
kernel = Kernel()
kernel.add_service(AzureChatCompletion())
agent = ChatCompletionAgent(kernel=kernel, name="echo", instructions="repeat the input back")
step_instance = BStep()
step_instance.agent = agent
return step_instance
class BStep(KernelProcessStep):
"""A sample BStep that optionally holds a ChatCompletionAgent.
By design, the agent is ephemeral (not stored in state).
"""
# Ephemeral references won't be persisted to Dapr
# because we do not place them in a step state model.
# We'll set this in the factory function:
agent: ChatCompletionAgent | None = None
@kernel_function(name="do_it")
async def do_it(self, context: KernelProcessStepContext):
print("##### BStep ran (do_it).")
await asyncio.sleep(2)
if self.agent:
history = ChatHistory()
history.add_user_message("Hello from BStep!")
async for msg in self.agent.invoke(history):
print(f"BStep got agent response: {msg.content}")
await context.emit_event(process_event="BStepDone", data="I did B")
```
Although this isn't explicitly necessary with the local runtime, the
factory callback will also work, if desired.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adds the ability to specify a factory callback for a step in the process
framework.
- Adjusts the Dapr FastAPI demo sample to show how one can include a
dependency like a `ChatCompletionAgent` and use the factory callback for
`BStep`. Although the output from the agent isn't needed, it
demonstrates the capability to handle these types of dependencies while
running Dapr.
- Adds unit tests
- Closes #10409
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-11 10:39:08 +09:00
|
|
|
from collections.abc import Callable
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
from copy import copy
|
2024-11-15 09:43:15 -05:00
|
|
|
from enum import Enum
|
2025-04-29 07:51:11 +09:00
|
|
|
from typing import TYPE_CHECKING, cast
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
|
|
|
|
|
from pydantic import Field
|
|
|
|
|
|
|
|
|
|
from semantic_kernel.exceptions.process_exceptions import ProcessInvalidConfigurationException
|
|
|
|
|
from semantic_kernel.processes.kernel_process.kernel_process_function_target import KernelProcessFunctionTarget
|
|
|
|
|
from semantic_kernel.processes.kernel_process.kernel_process_state import KernelProcessState
|
|
|
|
|
from semantic_kernel.processes.kernel_process.kernel_process_step_info import KernelProcessStepInfo
|
2025-04-29 07:51:11 +09:00
|
|
|
from semantic_kernel.processes.kernel_process.kernel_process_step_state_metadata import (
|
|
|
|
|
KernelProcessStateMetadata,
|
|
|
|
|
KernelProcessStepStateMetadata,
|
|
|
|
|
)
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
from semantic_kernel.processes.process_edge_builder import ProcessEdgeBuilder
|
|
|
|
|
from semantic_kernel.processes.process_function_target_builder import ProcessFunctionTargetBuilder
|
2025-04-29 07:51:11 +09:00
|
|
|
from semantic_kernel.processes.process_state_metadata_utils import to_process_state_metadata
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
from semantic_kernel.processes.process_step_builder import ProcessStepBuilder
|
|
|
|
|
from semantic_kernel.processes.process_step_edge_builder import ProcessStepEdgeBuilder
|
|
|
|
|
from semantic_kernel.processes.process_types import TState, TStep
|
Python: Allow for factory callbacks in the process framework (#10451)
### Motivation and Context
In the current Python process framework, in runtimes like Dapr, there is
no easy way to pass complex (unserializable) dependencies to a step --
this includes things like a ChatCompletion service or an agent of type
ChatCompletionAgent.
Similar to how the kernel dependency was propagated to the step_actor or
process_actor, we're introducing the ability to specify a factory
callback that will be called as the step is instantiated. The factory is
created, if specified via the optional kwarg when adding a step to the
process builder like:
```python
myBStep = process.add_step(step_type=BStep, factory_function=bstep_factory)
```
The `bstep_factory` looks like (along with its corresponding step)
```python
async def bstep_factory():
"""Creates a BStep instance with ephemeral references like ChatCompletionAgent."""
kernel = Kernel()
kernel.add_service(AzureChatCompletion())
agent = ChatCompletionAgent(kernel=kernel, name="echo", instructions="repeat the input back")
step_instance = BStep()
step_instance.agent = agent
return step_instance
class BStep(KernelProcessStep):
"""A sample BStep that optionally holds a ChatCompletionAgent.
By design, the agent is ephemeral (not stored in state).
"""
# Ephemeral references won't be persisted to Dapr
# because we do not place them in a step state model.
# We'll set this in the factory function:
agent: ChatCompletionAgent | None = None
@kernel_function(name="do_it")
async def do_it(self, context: KernelProcessStepContext):
print("##### BStep ran (do_it).")
await asyncio.sleep(2)
if self.agent:
history = ChatHistory()
history.add_user_message("Hello from BStep!")
async for msg in self.agent.invoke(history):
print(f"BStep got agent response: {msg.content}")
await context.emit_event(process_event="BStepDone", data="I did B")
```
Although this isn't explicitly necessary with the local runtime, the
factory callback will also work, if desired.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adds the ability to specify a factory callback for a step in the process
framework.
- Adjusts the Dapr FastAPI demo sample to show how one can include a
dependency like a `ChatCompletionAgent` and use the factory callback for
`BStep`. Although the output from the agent isn't needed, it
demonstrates the capability to handle these types of dependencies while
running Dapr.
- Adds unit tests
- Closes #10409
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-11 10:39:08 +09:00
|
|
|
from semantic_kernel.processes.step_utils import get_fully_qualified_name
|
Python: Introduce feature decorator to allow for experimental and release candidate decorator usage (#10691)
### Motivation and Context
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
This change is required to improve the flexibility and maintainability
of our feature annotation system. Previously, separate decorators (e.g.,
`experimental_function` and `experimental_class`) were used to mark
experimental features, resulting in code duplication and limiting our
ability to handle additional feature stages. As our SDK evolves, we need
a unified approach that can support multiple stages—such as
experimental, release candidate, and future states—while also allowing
version information for release candidate features to be centrally
managed.
### Description
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
This PR refactors our feature decorators by introducing a unified
`stage` decorator that updates the docstring and attaches metadata for
both functions and classes. Two convenience decorators, `experimental`
and `release_candidate`, are built on top of `stage`:
- The `experimental` decorator marks features as experimental and sets
an `is_experimental` attribute.
- The `release_candidate` decorator supports multiple usage patterns
(with or without parentheses and with an optional version parameter) to
mark features as release candidate and sets an `is_release_candidate`
attribute.
This unified approach reduces duplication, simplifies the codebase, and
lays the groundwork for easily extending feature stages in the future.
This decorator supports the following usage patterns:
- `@experimental` (for both classes and functions)
- `@release_candidate` (no parentheses)
- `@release_candidate()` (empty parentheses)
- `@release_candidate("1.21.3-rc1")` (positional version)
- `@release_candidate(version="1.21.3-rc1")` (keyword version)
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-27 09:17:54 +09:00
|
|
|
from semantic_kernel.utils.feature_stage_decorator import experimental
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
|
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess
|
|
|
|
|
|
|
|
|
|
|
Python: Introduce feature decorator to allow for experimental and release candidate decorator usage (#10691)
### Motivation and Context
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
This change is required to improve the flexibility and maintainability
of our feature annotation system. Previously, separate decorators (e.g.,
`experimental_function` and `experimental_class`) were used to mark
experimental features, resulting in code duplication and limiting our
ability to handle additional feature stages. As our SDK evolves, we need
a unified approach that can support multiple stages—such as
experimental, release candidate, and future states—while also allowing
version information for release candidate features to be centrally
managed.
### Description
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
This PR refactors our feature decorators by introducing a unified
`stage` decorator that updates the docstring and attaches metadata for
both functions and classes. Two convenience decorators, `experimental`
and `release_candidate`, are built on top of `stage`:
- The `experimental` decorator marks features as experimental and sets
an `is_experimental` attribute.
- The `release_candidate` decorator supports multiple usage patterns
(with or without parentheses and with an optional version parameter) to
mark features as release candidate and sets an `is_release_candidate`
attribute.
This unified approach reduces duplication, simplifies the codebase, and
lays the groundwork for easily extending feature stages in the future.
This decorator supports the following usage patterns:
- `@experimental` (for both classes and functions)
- `@release_candidate` (no parentheses)
- `@release_candidate()` (empty parentheses)
- `@release_candidate("1.21.3-rc1")` (positional version)
- `@release_candidate(version="1.21.3-rc1")` (keyword version)
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-27 09:17:54 +09:00
|
|
|
@experimental
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
class ProcessBuilder(ProcessStepBuilder):
|
|
|
|
|
"""A builder for a process."""
|
|
|
|
|
|
|
|
|
|
entry_steps: list["ProcessStepBuilder"] = Field(default_factory=list)
|
|
|
|
|
external_event_target_map: dict[str, "ProcessFunctionTargetBuilder"] = Field(default_factory=dict)
|
|
|
|
|
has_parent_process: bool = False
|
2025-04-29 07:51:11 +09:00
|
|
|
version: str = "v1"
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
|
|
|
|
|
steps: list["ProcessStepBuilder"] = Field(default_factory=list)
|
Python: Allow for factory callbacks in the process framework (#10451)
### Motivation and Context
In the current Python process framework, in runtimes like Dapr, there is
no easy way to pass complex (unserializable) dependencies to a step --
this includes things like a ChatCompletion service or an agent of type
ChatCompletionAgent.
Similar to how the kernel dependency was propagated to the step_actor or
process_actor, we're introducing the ability to specify a factory
callback that will be called as the step is instantiated. The factory is
created, if specified via the optional kwarg when adding a step to the
process builder like:
```python
myBStep = process.add_step(step_type=BStep, factory_function=bstep_factory)
```
The `bstep_factory` looks like (along with its corresponding step)
```python
async def bstep_factory():
"""Creates a BStep instance with ephemeral references like ChatCompletionAgent."""
kernel = Kernel()
kernel.add_service(AzureChatCompletion())
agent = ChatCompletionAgent(kernel=kernel, name="echo", instructions="repeat the input back")
step_instance = BStep()
step_instance.agent = agent
return step_instance
class BStep(KernelProcessStep):
"""A sample BStep that optionally holds a ChatCompletionAgent.
By design, the agent is ephemeral (not stored in state).
"""
# Ephemeral references won't be persisted to Dapr
# because we do not place them in a step state model.
# We'll set this in the factory function:
agent: ChatCompletionAgent | None = None
@kernel_function(name="do_it")
async def do_it(self, context: KernelProcessStepContext):
print("##### BStep ran (do_it).")
await asyncio.sleep(2)
if self.agent:
history = ChatHistory()
history.add_user_message("Hello from BStep!")
async for msg in self.agent.invoke(history):
print(f"BStep got agent response: {msg.content}")
await context.emit_event(process_event="BStepDone", data="I did B")
```
Although this isn't explicitly necessary with the local runtime, the
factory callback will also work, if desired.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adds the ability to specify a factory callback for a step in the process
framework.
- Adjusts the Dapr FastAPI demo sample to show how one can include a
dependency like a `ChatCompletionAgent` and use the factory callback for
`BStep`. Although the output from the agent isn't needed, it
demonstrates the capability to handle these types of dependencies while
running Dapr.
- Adds unit tests
- Closes #10409
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-11 10:39:08 +09:00
|
|
|
factories: dict[str, Callable] = Field(default_factory=dict)
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
|
|
|
|
|
def add_step(
|
|
|
|
|
self,
|
|
|
|
|
step_type: type[TStep],
|
|
|
|
|
name: str | None = None,
|
|
|
|
|
initial_state: TState | None = None,
|
Python: Allow for factory callbacks in the process framework (#10451)
### Motivation and Context
In the current Python process framework, in runtimes like Dapr, there is
no easy way to pass complex (unserializable) dependencies to a step --
this includes things like a ChatCompletion service or an agent of type
ChatCompletionAgent.
Similar to how the kernel dependency was propagated to the step_actor or
process_actor, we're introducing the ability to specify a factory
callback that will be called as the step is instantiated. The factory is
created, if specified via the optional kwarg when adding a step to the
process builder like:
```python
myBStep = process.add_step(step_type=BStep, factory_function=bstep_factory)
```
The `bstep_factory` looks like (along with its corresponding step)
```python
async def bstep_factory():
"""Creates a BStep instance with ephemeral references like ChatCompletionAgent."""
kernel = Kernel()
kernel.add_service(AzureChatCompletion())
agent = ChatCompletionAgent(kernel=kernel, name="echo", instructions="repeat the input back")
step_instance = BStep()
step_instance.agent = agent
return step_instance
class BStep(KernelProcessStep):
"""A sample BStep that optionally holds a ChatCompletionAgent.
By design, the agent is ephemeral (not stored in state).
"""
# Ephemeral references won't be persisted to Dapr
# because we do not place them in a step state model.
# We'll set this in the factory function:
agent: ChatCompletionAgent | None = None
@kernel_function(name="do_it")
async def do_it(self, context: KernelProcessStepContext):
print("##### BStep ran (do_it).")
await asyncio.sleep(2)
if self.agent:
history = ChatHistory()
history.add_user_message("Hello from BStep!")
async for msg in self.agent.invoke(history):
print(f"BStep got agent response: {msg.content}")
await context.emit_event(process_event="BStepDone", data="I did B")
```
Although this isn't explicitly necessary with the local runtime, the
factory callback will also work, if desired.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adds the ability to specify a factory callback for a step in the process
framework.
- Adjusts the Dapr FastAPI demo sample to show how one can include a
dependency like a `ChatCompletionAgent` and use the factory callback for
`BStep`. Although the output from the agent isn't needed, it
demonstrates the capability to handle these types of dependencies while
running Dapr.
- Adds unit tests
- Closes #10409
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-11 10:39:08 +09:00
|
|
|
factory_function: Callable | None = None,
|
2025-04-29 07:51:11 +09:00
|
|
|
aliases: list[str] | None = None,
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
**kwargs,
|
|
|
|
|
) -> ProcessStepBuilder[TState, TStep]:
|
Python: Allow for factory callbacks in the process framework (#10451)
### Motivation and Context
In the current Python process framework, in runtimes like Dapr, there is
no easy way to pass complex (unserializable) dependencies to a step --
this includes things like a ChatCompletion service or an agent of type
ChatCompletionAgent.
Similar to how the kernel dependency was propagated to the step_actor or
process_actor, we're introducing the ability to specify a factory
callback that will be called as the step is instantiated. The factory is
created, if specified via the optional kwarg when adding a step to the
process builder like:
```python
myBStep = process.add_step(step_type=BStep, factory_function=bstep_factory)
```
The `bstep_factory` looks like (along with its corresponding step)
```python
async def bstep_factory():
"""Creates a BStep instance with ephemeral references like ChatCompletionAgent."""
kernel = Kernel()
kernel.add_service(AzureChatCompletion())
agent = ChatCompletionAgent(kernel=kernel, name="echo", instructions="repeat the input back")
step_instance = BStep()
step_instance.agent = agent
return step_instance
class BStep(KernelProcessStep):
"""A sample BStep that optionally holds a ChatCompletionAgent.
By design, the agent is ephemeral (not stored in state).
"""
# Ephemeral references won't be persisted to Dapr
# because we do not place them in a step state model.
# We'll set this in the factory function:
agent: ChatCompletionAgent | None = None
@kernel_function(name="do_it")
async def do_it(self, context: KernelProcessStepContext):
print("##### BStep ran (do_it).")
await asyncio.sleep(2)
if self.agent:
history = ChatHistory()
history.add_user_message("Hello from BStep!")
async for msg in self.agent.invoke(history):
print(f"BStep got agent response: {msg.content}")
await context.emit_event(process_event="BStepDone", data="I did B")
```
Although this isn't explicitly necessary with the local runtime, the
factory callback will also work, if desired.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adds the ability to specify a factory callback for a step in the process
framework.
- Adjusts the Dapr FastAPI demo sample to show how one can include a
dependency like a `ChatCompletionAgent` and use the factory callback for
`BStep`. Although the output from the agent isn't needed, it
demonstrates the capability to handle these types of dependencies while
running Dapr.
- Adds unit tests
- Closes #10409
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-11 10:39:08 +09:00
|
|
|
"""Register a step type with optional constructor arguments.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
step_type: The step type.
|
|
|
|
|
name: The name of the step. Defaults to None.
|
|
|
|
|
initial_state: The initial state of the step. Defaults to None.
|
|
|
|
|
factory_function: The factory function. Allows for a callable that is used to create the step instance
|
|
|
|
|
that may have complex dependencies that cannot be JSON serialized or deserialized. Defaults to None.
|
2025-04-29 07:51:11 +09:00
|
|
|
aliases: The aliases of the step. Defaults to None.
|
Python: Allow for factory callbacks in the process framework (#10451)
### Motivation and Context
In the current Python process framework, in runtimes like Dapr, there is
no easy way to pass complex (unserializable) dependencies to a step --
this includes things like a ChatCompletion service or an agent of type
ChatCompletionAgent.
Similar to how the kernel dependency was propagated to the step_actor or
process_actor, we're introducing the ability to specify a factory
callback that will be called as the step is instantiated. The factory is
created, if specified via the optional kwarg when adding a step to the
process builder like:
```python
myBStep = process.add_step(step_type=BStep, factory_function=bstep_factory)
```
The `bstep_factory` looks like (along with its corresponding step)
```python
async def bstep_factory():
"""Creates a BStep instance with ephemeral references like ChatCompletionAgent."""
kernel = Kernel()
kernel.add_service(AzureChatCompletion())
agent = ChatCompletionAgent(kernel=kernel, name="echo", instructions="repeat the input back")
step_instance = BStep()
step_instance.agent = agent
return step_instance
class BStep(KernelProcessStep):
"""A sample BStep that optionally holds a ChatCompletionAgent.
By design, the agent is ephemeral (not stored in state).
"""
# Ephemeral references won't be persisted to Dapr
# because we do not place them in a step state model.
# We'll set this in the factory function:
agent: ChatCompletionAgent | None = None
@kernel_function(name="do_it")
async def do_it(self, context: KernelProcessStepContext):
print("##### BStep ran (do_it).")
await asyncio.sleep(2)
if self.agent:
history = ChatHistory()
history.add_user_message("Hello from BStep!")
async for msg in self.agent.invoke(history):
print(f"BStep got agent response: {msg.content}")
await context.emit_event(process_event="BStepDone", data="I did B")
```
Although this isn't explicitly necessary with the local runtime, the
factory callback will also work, if desired.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adds the ability to specify a factory callback for a step in the process
framework.
- Adjusts the Dapr FastAPI demo sample to show how one can include a
dependency like a `ChatCompletionAgent` and use the factory callback for
`BStep`. Although the output from the agent isn't needed, it
demonstrates the capability to handle these types of dependencies while
running Dapr.
- Adds unit tests
- Closes #10409
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-11 10:39:08 +09:00
|
|
|
kwargs: Additional keyword arguments.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
The process step builder.
|
|
|
|
|
"""
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
if not inspect.isclass(step_type):
|
|
|
|
|
raise ProcessInvalidConfigurationException(
|
|
|
|
|
f"Expected a class type, but got an instance of {type(step_type).__name__}"
|
|
|
|
|
)
|
|
|
|
|
|
Python: Allow for factory callbacks in the process framework (#10451)
### Motivation and Context
In the current Python process framework, in runtimes like Dapr, there is
no easy way to pass complex (unserializable) dependencies to a step --
this includes things like a ChatCompletion service or an agent of type
ChatCompletionAgent.
Similar to how the kernel dependency was propagated to the step_actor or
process_actor, we're introducing the ability to specify a factory
callback that will be called as the step is instantiated. The factory is
created, if specified via the optional kwarg when adding a step to the
process builder like:
```python
myBStep = process.add_step(step_type=BStep, factory_function=bstep_factory)
```
The `bstep_factory` looks like (along with its corresponding step)
```python
async def bstep_factory():
"""Creates a BStep instance with ephemeral references like ChatCompletionAgent."""
kernel = Kernel()
kernel.add_service(AzureChatCompletion())
agent = ChatCompletionAgent(kernel=kernel, name="echo", instructions="repeat the input back")
step_instance = BStep()
step_instance.agent = agent
return step_instance
class BStep(KernelProcessStep):
"""A sample BStep that optionally holds a ChatCompletionAgent.
By design, the agent is ephemeral (not stored in state).
"""
# Ephemeral references won't be persisted to Dapr
# because we do not place them in a step state model.
# We'll set this in the factory function:
agent: ChatCompletionAgent | None = None
@kernel_function(name="do_it")
async def do_it(self, context: KernelProcessStepContext):
print("##### BStep ran (do_it).")
await asyncio.sleep(2)
if self.agent:
history = ChatHistory()
history.add_user_message("Hello from BStep!")
async for msg in self.agent.invoke(history):
print(f"BStep got agent response: {msg.content}")
await context.emit_event(process_event="BStepDone", data="I did B")
```
Although this isn't explicitly necessary with the local runtime, the
factory callback will also work, if desired.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adds the ability to specify a factory callback for a step in the process
framework.
- Adjusts the Dapr FastAPI demo sample to show how one can include a
dependency like a `ChatCompletionAgent` and use the factory callback for
`BStep`. Although the output from the agent isn't needed, it
demonstrates the capability to handle these types of dependencies while
running Dapr.
- Adds unit tests
- Closes #10409
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-11 10:39:08 +09:00
|
|
|
if factory_function:
|
|
|
|
|
fq_name = get_fully_qualified_name(step_type)
|
|
|
|
|
self.factories[fq_name] = factory_function
|
|
|
|
|
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
name = name or step_type.__name__
|
2025-04-29 07:51:11 +09:00
|
|
|
process_step_builder = ProcessStepBuilder(
|
|
|
|
|
type=step_type, name=name, initial_state=initial_state, aliases=aliases, **kwargs
|
|
|
|
|
)
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
self.steps.append(process_step_builder)
|
|
|
|
|
|
|
|
|
|
return process_step_builder
|
|
|
|
|
|
2025-04-29 07:51:11 +09:00
|
|
|
def add_step_from_process(
|
|
|
|
|
self, kernel_process: "ProcessBuilder", aliases: list[str] | None = None
|
|
|
|
|
) -> "ProcessBuilder":
|
|
|
|
|
"""Adds a step from the given process.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
kernel_process: The process to add.
|
|
|
|
|
aliases: The aliases of the step. Defaults to None.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
The process builder.
|
|
|
|
|
"""
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
kernel_process.has_parent_process = True
|
2025-04-29 07:51:11 +09:00
|
|
|
if aliases:
|
|
|
|
|
kernel_process.aliases = aliases
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
self.steps.append(kernel_process)
|
|
|
|
|
return kernel_process
|
|
|
|
|
|
|
|
|
|
def resolve_function_target(
|
|
|
|
|
self, function_name: str | None, parameter_name: str | None
|
|
|
|
|
) -> KernelProcessFunctionTarget:
|
|
|
|
|
"""Resolves the function target."""
|
|
|
|
|
targets = []
|
|
|
|
|
for step in self.entry_steps:
|
|
|
|
|
with contextlib.suppress(ValueError):
|
|
|
|
|
targets.append(step.resolve_function_target(function_name, parameter_name))
|
|
|
|
|
|
|
|
|
|
if len(targets) == 0:
|
|
|
|
|
raise ValueError(f"No targets found for function '{function_name}.{parameter_name}'")
|
|
|
|
|
if len(targets) > 1:
|
|
|
|
|
raise ValueError(f"Multiple targets found for function '{function_name}.{parameter_name}'")
|
|
|
|
|
|
|
|
|
|
return targets[0]
|
|
|
|
|
|
2024-11-15 09:43:15 -05:00
|
|
|
def where_input_event_is(self, event_id: str | Enum) -> "ProcessFunctionTargetBuilder":
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
"""Filters the input event."""
|
2024-11-15 09:43:15 -05:00
|
|
|
event_id_str: str = event_id.value if isinstance(event_id, Enum) else event_id
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
|
2024-11-15 09:43:15 -05:00
|
|
|
if event_id_str not in self.external_event_target_map:
|
|
|
|
|
raise ValueError(f"The process named '{self.name}' does not expose an event with Id '{event_id_str}'")
|
|
|
|
|
|
|
|
|
|
target = self.external_event_target_map[event_id_str]
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
target = copy(target)
|
|
|
|
|
target.step = self
|
2024-11-15 09:43:15 -05:00
|
|
|
target.target_event_id = event_id_str
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
return target
|
|
|
|
|
|
2024-11-15 09:43:15 -05:00
|
|
|
def on_input_event(self, event_id: str | Enum) -> "ProcessEdgeBuilder": # type: ignore
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
"""Creates a new ProcessEdgeBuilder for the input event."""
|
|
|
|
|
from semantic_kernel.processes.process_builder import ProcessBuilder # noqa: F401
|
|
|
|
|
|
|
|
|
|
ProcessEdgeBuilder.model_rebuild()
|
2024-11-15 09:43:15 -05:00
|
|
|
|
|
|
|
|
event_id_str: str = event_id.value if isinstance(event_id, Enum) else event_id
|
|
|
|
|
|
|
|
|
|
return ProcessEdgeBuilder(source=self, event_id=event_id_str)
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
|
|
|
|
|
def link_to(self, event_id: str, edge_builder: ProcessStepEdgeBuilder) -> None:
|
|
|
|
|
"""Links to the given event ID."""
|
|
|
|
|
if edge_builder.target is None:
|
|
|
|
|
raise ValueError("Target must be set before linking")
|
|
|
|
|
self.entry_steps.append(edge_builder.source)
|
|
|
|
|
self.external_event_target_map[event_id] = edge_builder.target
|
|
|
|
|
super().link_to(event_id, edge_builder)
|
|
|
|
|
|
2025-04-29 07:51:11 +09:00
|
|
|
def build_step(self, state_metadata: KernelProcessStepStateMetadata | None = None) -> KernelProcessStepInfo:
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
"""Builds the process step."""
|
|
|
|
|
# The process is a step so we can return the step info directly
|
2025-04-29 07:51:11 +09:00
|
|
|
# convert to KernelProcessStateMetadata...
|
|
|
|
|
if state_metadata is None or isinstance(state_metadata, KernelProcessStateMetadata):
|
|
|
|
|
metadata: KernelProcessStateMetadata | None = cast(KernelProcessStateMetadata | None, state_metadata)
|
|
|
|
|
else:
|
|
|
|
|
metadata = KernelProcessStateMetadata(
|
|
|
|
|
name=self.name,
|
|
|
|
|
id=self.id if self.has_parent_process else None,
|
|
|
|
|
version_info=self.version,
|
|
|
|
|
)
|
|
|
|
|
return self.build(state_metadata=metadata)
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
|
2025-04-29 07:51:11 +09:00
|
|
|
def build(self, state_metadata: KernelProcessStateMetadata | None = None) -> "KernelProcess":
|
Python: Add the Python process framework (#9363)
### Motivation and Context
An initial PR to add the foundational pieces of the Python Process
framework, which holds it design to be similar to dotnet in that step
types are added to a process builder, and later on, when the step is
run, it is first instantiated and the proper state is provided.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adding the initial process framework components:
- Closes #9354
**TODO**
- more unit tests will be added to increase the code coverage. Currently
there are several files with no (or low) code coverage.
- more samples will either be added to this PR or a subsequent PR
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2024-10-24 13:37:45 -04:00
|
|
|
"""Builds the KernelProcess."""
|
|
|
|
|
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess
|
|
|
|
|
|
|
|
|
|
built_edges = {key: [edge.build() for edge in edges] for key, edges in self.edges.items()}
|
|
|
|
|
built_steps = [step.build_step() for step in self.steps]
|
2025-04-29 07:51:11 +09:00
|
|
|
built_steps = self._build_with_state_metadata(state_metadata=state_metadata)
|
|
|
|
|
|
|
|
|
|
process_state = KernelProcessState(
|
|
|
|
|
name=self.name, id=self.id if self.has_parent_process else None, version=self.version
|
|
|
|
|
)
|
Python: Allow for factory callbacks in the process framework (#10451)
### Motivation and Context
In the current Python process framework, in runtimes like Dapr, there is
no easy way to pass complex (unserializable) dependencies to a step --
this includes things like a ChatCompletion service or an agent of type
ChatCompletionAgent.
Similar to how the kernel dependency was propagated to the step_actor or
process_actor, we're introducing the ability to specify a factory
callback that will be called as the step is instantiated. The factory is
created, if specified via the optional kwarg when adding a step to the
process builder like:
```python
myBStep = process.add_step(step_type=BStep, factory_function=bstep_factory)
```
The `bstep_factory` looks like (along with its corresponding step)
```python
async def bstep_factory():
"""Creates a BStep instance with ephemeral references like ChatCompletionAgent."""
kernel = Kernel()
kernel.add_service(AzureChatCompletion())
agent = ChatCompletionAgent(kernel=kernel, name="echo", instructions="repeat the input back")
step_instance = BStep()
step_instance.agent = agent
return step_instance
class BStep(KernelProcessStep):
"""A sample BStep that optionally holds a ChatCompletionAgent.
By design, the agent is ephemeral (not stored in state).
"""
# Ephemeral references won't be persisted to Dapr
# because we do not place them in a step state model.
# We'll set this in the factory function:
agent: ChatCompletionAgent | None = None
@kernel_function(name="do_it")
async def do_it(self, context: KernelProcessStepContext):
print("##### BStep ran (do_it).")
await asyncio.sleep(2)
if self.agent:
history = ChatHistory()
history.add_user_message("Hello from BStep!")
async for msg in self.agent.invoke(history):
print(f"BStep got agent response: {msg.content}")
await context.emit_event(process_event="BStepDone", data="I did B")
```
Although this isn't explicitly necessary with the local runtime, the
factory callback will also work, if desired.
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
1. Why is this change required?
2. What problem does it solve?
3. What scenario does it contribute to?
4. If it fixes an open issue, please link to the issue here.
-->
### Description
Adds the ability to specify a factory callback for a step in the process
framework.
- Adjusts the Dapr FastAPI demo sample to show how one can include a
dependency like a `ChatCompletionAgent` and use the factory callback for
`BStep`. Although the output from the agent isn't needed, it
demonstrates the capability to handle these types of dependencies while
running Dapr.
- Adds unit tests
- Closes #10409
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->
- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone :smile:
2025-02-11 10:39:08 +09:00
|
|
|
return KernelProcess(state=process_state, steps=built_steps, edges=built_edges, factories=self.factories)
|
2025-04-29 07:51:11 +09:00
|
|
|
|
|
|
|
|
def _build_with_state_metadata(
|
|
|
|
|
self, state_metadata: "KernelProcessStateMetadata | None"
|
|
|
|
|
) -> list["KernelProcessStepInfo"]:
|
|
|
|
|
built_steps: list["KernelProcessStepInfo"] = []
|
|
|
|
|
|
|
|
|
|
# 1- Validate StateMetadata: Migrate previous state versions if needed + sanitize state
|
|
|
|
|
sanitized_metadata: "KernelProcessStateMetadata | None" = None
|
|
|
|
|
if state_metadata is not None:
|
|
|
|
|
sanitized_metadata = self._sanitize_process_state_metadata(state_metadata, self.steps)
|
|
|
|
|
|
|
|
|
|
# 2- Build steps info with validated stateMetadata
|
|
|
|
|
for step in self.steps:
|
|
|
|
|
if (
|
|
|
|
|
sanitized_metadata
|
|
|
|
|
and sanitized_metadata.steps_state
|
|
|
|
|
and step.name in sanitized_metadata.steps_state
|
|
|
|
|
and sanitized_metadata.steps_state[step.name] is not None
|
|
|
|
|
):
|
|
|
|
|
built_steps.append(step.build_step(sanitized_metadata.steps_state[step.name]))
|
|
|
|
|
else:
|
|
|
|
|
built_steps.append(step.build_step())
|
|
|
|
|
|
|
|
|
|
return built_steps
|
|
|
|
|
|
|
|
|
|
def _sanitize_process_state_metadata(
|
|
|
|
|
self, state_metadata: "KernelProcessStateMetadata", step_builders: list["ProcessStepBuilder"]
|
|
|
|
|
) -> "KernelProcessStateMetadata":
|
|
|
|
|
sanitized_state_metadata = state_metadata
|
|
|
|
|
|
|
|
|
|
for step in step_builders:
|
|
|
|
|
# 1- find matching key name with exact match or by alias match
|
|
|
|
|
step_key: str | None = None
|
|
|
|
|
if sanitized_state_metadata.steps_state and step.name in sanitized_state_metadata.steps_state:
|
|
|
|
|
step_key = step.name
|
|
|
|
|
else:
|
|
|
|
|
step_key = next(
|
|
|
|
|
(
|
|
|
|
|
alias
|
|
|
|
|
for alias in step.aliases
|
|
|
|
|
if sanitized_state_metadata.steps_state and alias in sanitized_state_metadata.steps_state
|
|
|
|
|
),
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# 2- stepKey match found
|
|
|
|
|
if step_key is not None:
|
|
|
|
|
current_version_state_metadata = to_process_state_metadata(step.build_step())
|
|
|
|
|
saved_state_metadata = sanitized_state_metadata.steps_state.get(step_key)
|
|
|
|
|
|
|
|
|
|
if saved_state_metadata is not None and step_key != step.name:
|
|
|
|
|
if saved_state_metadata.version_info == current_version_state_metadata.version_info:
|
|
|
|
|
# key mismatch only, but same version
|
|
|
|
|
sanitized_state_metadata.steps_state[step.name] = saved_state_metadata
|
|
|
|
|
else:
|
|
|
|
|
# version mismatch - check if migration logic in place
|
|
|
|
|
if isinstance(step, ProcessBuilder):
|
|
|
|
|
if isinstance(saved_state_metadata, KernelProcessStepStateMetadata):
|
|
|
|
|
saved_state_metadata = KernelProcessStateMetadata(
|
|
|
|
|
name=step.name,
|
|
|
|
|
id=step.id,
|
|
|
|
|
version_info=step.version,
|
|
|
|
|
steps_state={},
|
|
|
|
|
)
|
|
|
|
|
sanitized_step_state = self._sanitize_process_state_metadata(
|
|
|
|
|
saved_state_metadata, step.steps
|
|
|
|
|
)
|
|
|
|
|
sanitized_state_metadata.steps_state[step.name] = sanitized_step_state
|
|
|
|
|
else:
|
|
|
|
|
# no compatible state found, migrating id only
|
|
|
|
|
sanitized_state_metadata.steps_state[step.name] = type(saved_state_metadata)(
|
|
|
|
|
Name=step.name,
|
|
|
|
|
Id=step.id,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
sanitized_state_metadata.steps_state[step.name].name = step.name
|
|
|
|
|
del sanitized_state_metadata.steps_state[step_key]
|
|
|
|
|
|
|
|
|
|
return sanitized_state_metadata
|