SIGN IN SIGN UP
The-Pocket / PocketFlow UNCLAIMED

Pocket Flow: 100-line LLM framework. Let Agents build Agents!

0 0 1 Python
2024-12-26 01:38:54 +00:00
import unittest
import sys
from pathlib import Path
2024-12-29 02:40:27 +00:00
sys.path.insert(0, str(Path(__file__).parent.parent))
2025-01-09 03:01:25 +00:00
from pocketflow import Node, BatchFlow, Flow
2024-12-26 01:38:54 +00:00
class DataProcessNode(Node):
2024-12-29 02:40:27 +00:00
def prep(self, shared_storage):
2024-12-26 01:38:54 +00:00
key = self.params.get('key')
data = shared_storage['input_data'][key]
if 'results' not in shared_storage:
shared_storage['results'] = {}
shared_storage['results'][key] = data * 2
class ErrorProcessNode(Node):
2024-12-29 02:40:27 +00:00
def prep(self, shared_storage):
2024-12-26 01:38:54 +00:00
key = self.params.get('key')
if key == 'error_key':
raise ValueError(f"Error processing key: {key}")
if 'results' not in shared_storage:
shared_storage['results'] = {}
shared_storage['results'][key] = True
class TestBatchFlow(unittest.TestCase):
def setUp(self):
self.process_node = DataProcessNode()
def test_basic_batch_processing(self):
"""Test basic batch processing with multiple keys"""
class SimpleTestBatchFlow(BatchFlow):
2024-12-26 03:18:00 +00:00
def prep(self, shared_storage):
2024-12-26 01:38:54 +00:00
return [{'key': k} for k in shared_storage['input_data'].keys()]
shared_storage = {
'input_data': {
'a': 1,
'b': 2,
'c': 3
}
}
2024-12-27 05:29:24 +00:00
flow = SimpleTestBatchFlow(start=self.process_node)
2024-12-26 01:38:54 +00:00
flow.run(shared_storage)
expected_results = {
'a': 2,
'b': 4,
'c': 6
}
self.assertEqual(shared_storage['results'], expected_results)
def test_empty_input(self):
"""Test batch processing with empty input dictionary"""
class EmptyTestBatchFlow(BatchFlow):
2024-12-26 03:18:00 +00:00
def prep(self, shared_storage):
2024-12-26 01:38:54 +00:00
return [{'key': k} for k in shared_storage['input_data'].keys()]
shared_storage = {
'input_data': {}
}
2024-12-27 05:29:24 +00:00
flow = EmptyTestBatchFlow(start=self.process_node)
2024-12-26 01:38:54 +00:00
flow.run(shared_storage)
self.assertEqual(shared_storage.get('results', {}), {})
def test_single_item(self):
"""Test batch processing with single item"""
class SingleItemBatchFlow(BatchFlow):
2024-12-26 03:18:00 +00:00
def prep(self, shared_storage):
2024-12-26 01:38:54 +00:00
return [{'key': k} for k in shared_storage['input_data'].keys()]
shared_storage = {
'input_data': {
'single': 5
}
}
2024-12-27 05:29:24 +00:00
flow = SingleItemBatchFlow(start=self.process_node)
2024-12-26 01:38:54 +00:00
flow.run(shared_storage)
expected_results = {
'single': 10
}
self.assertEqual(shared_storage['results'], expected_results)
def test_error_handling(self):
"""Test error handling during batch processing"""
class ErrorTestBatchFlow(BatchFlow):
2024-12-26 03:18:00 +00:00
def prep(self, shared_storage):
2024-12-26 01:38:54 +00:00
return [{'key': k} for k in shared_storage['input_data'].keys()]
shared_storage = {
'input_data': {
'normal_key': 1,
'error_key': 2,
'another_key': 3
}
}
2024-12-27 05:29:24 +00:00
flow = ErrorTestBatchFlow(start=ErrorProcessNode())
2024-12-26 01:38:54 +00:00
with self.assertRaises(ValueError):
flow.run(shared_storage)
def test_nested_flow(self):
"""Test batch processing with nested flows"""
class InnerNode(Node):
2024-12-29 02:40:27 +00:00
def exec(self, prep_result):
2024-12-26 01:38:54 +00:00
key = self.params.get('key')
if 'intermediate_results' not in shared_storage:
shared_storage['intermediate_results'] = {}
shared_storage['intermediate_results'][key] = shared_storage['input_data'][key] + 1
class OuterNode(Node):
2024-12-29 02:40:27 +00:00
def exec(self, prep_result):
2024-12-26 01:38:54 +00:00
key = self.params.get('key')
if 'results' not in shared_storage:
shared_storage['results'] = {}
shared_storage['results'][key] = shared_storage['intermediate_results'][key] * 2
class NestedBatchFlow(BatchFlow):
2024-12-26 03:18:00 +00:00
def prep(self, shared_storage):
2024-12-26 01:38:54 +00:00
return [{'key': k} for k in shared_storage['input_data'].keys()]
# Create inner flow
inner_node = InnerNode()
outer_node = OuterNode()
inner_node >> outer_node
shared_storage = {
'input_data': {
'x': 1,
'y': 2
}
}
2024-12-27 05:29:24 +00:00
flow = NestedBatchFlow(start=inner_node)
2024-12-26 01:38:54 +00:00
flow.run(shared_storage)
expected_results = {
'x': 4, # (1 + 1) * 2
'y': 6 # (2 + 1) * 2
}
self.assertEqual(shared_storage['results'], expected_results)
def test_custom_parameters(self):
"""Test batch processing with additional custom parameters"""
class CustomParamNode(Node):
2024-12-29 02:40:27 +00:00
def exec(self, prep_result):
2024-12-26 01:38:54 +00:00
key = self.params.get('key')
multiplier = self.params.get('multiplier', 1)
if 'results' not in shared_storage:
shared_storage['results'] = {}
shared_storage['results'][key] = shared_storage['input_data'][key] * multiplier
class CustomParamBatchFlow(BatchFlow):
2024-12-26 03:18:00 +00:00
def prep(self, shared_storage):
2024-12-26 01:38:54 +00:00
return [{
'key': k,
'multiplier': i + 1
} for i, k in enumerate(shared_storage['input_data'].keys())]
shared_storage = {
'input_data': {
'a': 1,
'b': 2,
'c': 3
}
}
2024-12-27 05:29:24 +00:00
flow = CustomParamBatchFlow(start=CustomParamNode())
2024-12-26 01:38:54 +00:00
flow.run(shared_storage)
expected_results = {
'a': 1 * 1, # first item, multiplier = 1
'b': 2 * 2, # second item, multiplier = 2
'c': 3 * 3 # third item, multiplier = 3
}
self.assertEqual(shared_storage['results'], expected_results)
2026-03-08 20:10:59 -07:00
def test_nested_batch_flow(self):
"""Test BatchFlow nested inside another BatchFlow (outer iterates groups, inner iterates items)"""
class ItemNode(Node):
def prep(self, shared_storage):
return shared_storage['groups'][self.params['group']][self.params['item']]
def exec(self, prep_res):
return prep_res * 2
def post(self, shared_storage, prep_res, exec_res):
group = self.params['group']
if 'results' not in shared_storage:
shared_storage['results'] = {}
if group not in shared_storage['results']:
shared_storage['results'][group] = []
shared_storage['results'][group].append(exec_res)
class InnerBatchFlow(BatchFlow):
def prep(self, shared_storage):
group = self.params['group']
return [{'item': i, 'group': group} for i in range(len(shared_storage['groups'][group]))]
class OuterBatchFlow(BatchFlow):
def prep(self, shared_storage):
return [{'group': g} for g in shared_storage['groups']]
item_node = ItemNode()
inner_flow = InnerBatchFlow(start=item_node)
outer_flow = OuterBatchFlow(start=inner_flow)
shared_storage = {
'groups': {
'A': [1, 2],
'B': [3, 4],
}
}
outer_flow.run(shared_storage)
expected = {
'A': [2, 4],
'B': [6, 8],
}
self.assertEqual(shared_storage['results'], expected)
2024-12-26 01:38:54 +00:00
if __name__ == '__main__':
unittest.main()