# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from abc import ABC, abstractmethod from io import StringIO import cProfile import os import pstats from threading import RLock from types import CodeType, TracebackType from typing import ( Any, Callable, Dict, Iterable, Literal, Optional, Tuple, Union, TYPE_CHECKING, overload, ) import warnings import pyspark.memory_profiler_ext from pyspark.accumulators import ( Accumulator, AccumulatorParam, SpecialAccumulatorIds, _accumulatorRegistry, ) from pyspark.errors import PySparkValueError from pyspark.profiler import ( CodeMapDict, MemoryProfiler, MemUsageParam, PStatsParam, ) if TYPE_CHECKING: from pyspark.sql._typing import ProfileResults class _ProfileResultsParam(AccumulatorParam[Optional["ProfileResults"]]): """ AccumulatorParam for profilers. """ @staticmethod def zero(value: Optional["ProfileResults"]) -> Optional["ProfileResults"]: return value @staticmethod def addInPlace( value1: Optional["ProfileResults"], value2: Optional["ProfileResults"] ) -> Optional["ProfileResults"]: if value1 is None or len(value1) == 0: value1 = {} if value2 is None or len(value2) == 0: value2 = {} value = value1.copy() for key, (perf, mem, *_) in value2.items(): if key in value1: orig_perf, orig_mem, *_ = value1[key] else: orig_perf, orig_mem = (PStatsParam.zero(None), MemUsageParam.zero(None)) value[key] = ( PStatsParam.addInPlace(orig_perf, perf), MemUsageParam.addInPlace(orig_mem, mem), ) return value ProfileResultsParam = _ProfileResultsParam() class WorkerPerfProfiler: """ PerfProfiler is a profiler for performance profiling. """ def __init__( self, accumulator: Accumulator[Optional["ProfileResults"]], result_key: Union[int, str] ) -> None: self._accumulator = accumulator self._profiler = cProfile.Profile() self._result_key = result_key def start(self) -> None: self._profiler.enable() def stop(self) -> None: self._profiler.disable() def save(self) -> None: st = pstats.Stats(self._profiler, stream=None) # make it picklable st.stream = None # type: ignore[attr-defined] st.strip_dirs() self._accumulator.add({self._result_key: (st, None)}) def __enter__(self) -> "WorkerPerfProfiler": self.start() return self def __exit__( self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self.stop() self.save() class WorkerMemoryProfiler: """ MemoryProfiler is a profiler for memory profiling. """ def __init__( self, accumulator: Accumulator[Optional["ProfileResults"]], result_key: Union[int, str], func_or_code: Union[Callable, CodeType], ) -> None: from pyspark.memory_profiler_ext import UDFLineProfilerV2 self._accumulator = accumulator self._profiler = UDFLineProfilerV2() if isinstance(func_or_code, CodeType): self._profiler.add_code(func_or_code) else: self._profiler.add_function(func_or_code) self._result_key = result_key def start(self) -> None: self._profiler.enable_by_count() def stop(self) -> None: self._profiler.disable_by_count() def save(self) -> None: codemap_dict = { filename: list(line_iterator) for filename, line_iterator in self._profiler.code_map.items() } self._accumulator.add({self._result_key: (None, codemap_dict)}) def __enter__(self) -> "WorkerMemoryProfiler": self.start() return self def __exit__( self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self.stop() self.save() class ProfilerCollector(ABC): """ A base class of profiler collectors for session based profilers. This supports cProfiler and memory-profiler enabled by setting a SQL config `spark.sql.pyspark.udf.profiler` to "perf" or "memory". """ def __init__(self) -> None: self._lock = RLock() def _sorted_keys(self, keys: Iterable[Union[int, str]]) -> list[Union[int, str]]: int_keys = sorted(x for x in keys if isinstance(x, int)) str_keys = sorted(x for x in keys if isinstance(x, str)) return str_keys + int_keys def show_perf_profiles(self, id: Optional[Union[int, str]] = None) -> None: """ Show the perf profile results. .. versionadded:: 4.0.0 Parameters ---------- id : int, optional A UDF ID to be shown. If not specified, all the results will be shown. """ with self._lock: stats = self._perf_profile_results def show(id: Union[int, str]) -> None: s = stats.get(id) if s is not None: print("=" * 60) if isinstance(id, str): print(f"Profile of {id}") else: print(f"Profile of UDF") print("=" * 60) s.sort_stats("time", "cumulative").print_stats() if id is not None: show(id) else: for id in self._sorted_keys(stats.keys()): show(id) @property def _perf_profile_results(self) -> Dict[Union[int, str], pstats.Stats]: with self._lock: return { result_id: perf for result_id, (perf, _, *_) in self._profile_results.items() if perf is not None } def show_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None: """ Show the memory profile results. .. versionadded:: 4.0.0 Parameters ---------- id : int, optional A UDF ID to be shown. If not specified, all the results will be shown. """ with self._lock: code_map = self._memory_profile_results if not pyspark.memory_profiler_ext.has_memory_profiler and not code_map: warnings.warn( "Install the 'memory_profiler' library in the cluster to enable memory profiling", UserWarning, ) def show(id: Union[int, str]) -> None: cm = code_map.get(id) if cm is not None: print("=" * 60) if isinstance(id, str): print(f"Profile of {id}") else: print(f"Profile of UDF") print("=" * 60) MemoryProfiler._show_results(cm) if id is not None: show(id) else: for id in self._sorted_keys(code_map.keys()): show(id) @property def _memory_profile_results(self) -> Dict[Union[int, str], CodeMapDict]: with self._lock: return { result_id: mem for result_id, (_, mem, *_) in self._profile_results.items() if mem is not None } @property @abstractmethod def _profile_results(self) -> "ProfileResults": """ Get the profile results. """ ... def dump_perf_profiles(self, path: str, id: Optional[Union[int, str]] = None) -> None: """ Dump the perf profile results into directory `path`. .. versionadded:: 4.0.0 Parameters ---------- path: str A directory in which to dump the perf profile. id : int or str, optional A UDF ID to be shown. If not specified, all the results will be shown. """ with self._lock: stats = self._perf_profile_results def dump(id: Union[int, str]) -> None: s = stats.get(id) if s is not None: os.makedirs(path, exist_ok=True) p = os.path.join(path, f"udf_{id}_perf.pstats") s.dump_stats(p) if id is not None: dump(id) else: for id in self._sorted_keys(stats.keys()): dump(id) def dump_memory_profiles(self, path: str, id: Optional[Union[int, str]] = None) -> None: """ Dump the memory profile results into directory `path`. .. versionadded:: 4.0.0 Parameters ---------- path: str A directory in which to dump the memory profile. id : int or str, optional A UDF ID to be shown. If not specified, all the results will be shown. """ with self._lock: code_map = self._memory_profile_results if not pyspark.memory_profiler_ext.has_memory_profiler and not code_map: warnings.warn( "Install the 'memory_profiler' library in the cluster to enable memory profiling", UserWarning, ) def dump(id: Union[int, str]) -> None: cm = code_map.get(id) if cm is not None: os.makedirs(path, exist_ok=True) p = os.path.join(path, f"udf_{id}_memory.txt") with open(p, "w+") as f: MemoryProfiler._show_results(cm, stream=f) if id is not None: dump(id) else: for id in self._sorted_keys(code_map.keys()): dump(id) def clear_perf_profiles(self, id: Optional[Union[int, str]] = None) -> None: """ Clear the perf profile results. .. versionadded:: 4.0.0 Parameters ---------- id : int or str, optional The UDF ID whose profiling results should be cleared. If not specified, all the results will be cleared. """ with self._lock: if id is not None: if id in self._profile_results: perf, mem, *_ = self._profile_results[id] self._profile_results[id] = (None, mem, *_) if mem is None: self._profile_results.pop(id, None) else: for id, (perf, mem, *_) in list(self._profile_results.items()): self._profile_results[id] = (None, mem, *_) if mem is None: self._profile_results.pop(id, None) def clear_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None: """ Clear the memory profile results. .. versionadded:: 4.0.0 Parameters ---------- id : int or str, optional The UDF ID whose profiling results should be cleared. If not specified, all the results will be cleared. """ with self._lock: if id is not None: if id in self._profile_results: perf, mem, *_ = self._profile_results[id] self._profile_results[id] = (perf, None, *_) if perf is None: self._profile_results.pop(id, None) else: for id, (perf, mem, *_) in list(self._profile_results.items()): self._profile_results[id] = (perf, None, *_) if perf is None: self._profile_results.pop(id, None) class AccumulatorProfilerCollector(ProfilerCollector): def __init__(self) -> None: super().__init__() if SpecialAccumulatorIds.SQL_UDF_PROFIER in _accumulatorRegistry: self._accumulator = _accumulatorRegistry[SpecialAccumulatorIds.SQL_UDF_PROFIER] else: self._accumulator = Accumulator( SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam ) @property def _profile_results(self) -> "ProfileResults": with self._lock: value = self._accumulator.value return value if value is not None else {} class Profile: """User-facing profile API. This instance can be accessed by :attr:`spark.profile`. .. versionadded:: 4.0.0 """ def __init__(self, profiler_collector: ProfilerCollector): self.profiler_collector = profiler_collector def show(self, id: Optional[Union[int, str]] = None, *, type: Optional[str] = None) -> None: """ Show the profile results. .. versionadded:: 4.0.0 Parameters ---------- id : int or str, optional A UDF ID to be shown. If not specified, all the results will be shown. type : str, optional The profiler type, which can be either "perf" or "memory". Notes ----- The results are gathered from all Python executions. For example, if there are 8 tasks, each processing 1,000 rows, the total output will display the results for 8,000 rows. """ if type == "memory": self.profiler_collector.show_memory_profiles(id) elif type == "perf" or type is None: self.profiler_collector.show_perf_profiles(id) if type is None: # Show both perf and memory profiles self.profiler_collector.show_memory_profiles(id) else: raise PySparkValueError( errorClass="VALUE_NOT_ALLOWED", messageParameters={ "arg_name": "type", "allowed_values": str(["perf", "memory"]), }, ) def dump( self, path: str, id: Optional[Union[int, str]] = None, *, type: Optional[str] = None ) -> None: """ Dump the profile results into directory `path`. .. versionadded:: 4.0.0 Parameters ---------- path: str A directory in which to dump the profile. id : int or str, optional A UDF ID to be shown. If not specified, all the results will be shown. type : str, optional The profiler type, which can be either "perf" or "memory". """ if type == "memory": self.profiler_collector.dump_memory_profiles(path, id) elif type == "perf" or type is None: self.profiler_collector.dump_perf_profiles(path, id) if type is None: # Dump both perf and memory profiles self.profiler_collector.dump_memory_profiles(path, id) else: raise PySparkValueError( errorClass="VALUE_NOT_ALLOWED", messageParameters={ "arg_name": "type", "allowed_values": str(["perf", "memory"]), }, ) @overload def render( self, id: Union[int, str], *, type: Optional[str] = None, renderer: Optional[str] = None ) -> Any: ... @overload def render( self, id: Union[int, str], *, type: Optional[Literal["perf"]], renderer: Callable[[pstats.Stats], Any], ) -> Any: ... @overload def render( self, id: Union[int, str], *, type: Literal["memory"], renderer: Callable[[CodeMapDict], Any], ) -> Any: ... def render( self, id: Union[int, str], *, type: Optional[str] = None, renderer: Optional[ Union[str, Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]] ] = None, ) -> Any: """ Render the profile results. .. versionadded:: 4.0.0 Parameters ---------- id : int or str The UDF ID whose profiling results should be rendered. type : str, optional The profiler type to render results for, which can be either "perf" or "memory". If not specified, defaults to "perf". renderer : str or callable, optional The renderer to use. If not specified, the default renderer will be "flameprof" for the "perf" profiler, which returns an :class:`IPython.display.HTML` object in an IPython environment to draw the figure; otherwise, it returns the SVG source string. For the "memory" profiler, no default renderer is provided. If a callable is provided, it should take a `pstats.Stats` object for "perf" profiler, and `CodeMapDict` for "memory" profiler, and return the rendered result. """ result: Optional[Union[pstats.Stats, CodeMapDict]] if type is None: type = "perf" if type == "perf": result = self.profiler_collector._perf_profile_results.get(id) elif type == "memory": result = self.profiler_collector._memory_profile_results.get(id) else: raise PySparkValueError( errorClass="VALUE_NOT_ALLOWED", messageParameters={ "arg_name": "type", "allowed_values": str(["perf", "memory"]), }, ) render: Optional[Union[Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]] = None if renderer is None or isinstance(renderer, str): render = _renderers.get((type, renderer)) elif callable(renderer): render = renderer if render is None: raise PySparkValueError( errorClass="VALUE_NOT_ALLOWED", messageParameters={ "arg_name": "(type, renderer)", "allowed_values": str(list(_renderers.keys())), }, ) if result is not None: return render(result) # type: ignore[arg-type] def clear(self, id: Optional[Union[int, str]] = None, *, type: Optional[str] = None) -> None: """ Clear the profile results. .. versionadded:: 4.0.0 Parameters ---------- id : int or str, optional The UDF ID whose profiling results should be cleared. If not specified, all the results will be cleared. type : str, optional The profiler type to clear results for, which can be either "perf" or "memory". """ if type == "memory": self.profiler_collector.clear_memory_profiles(id) elif type == "perf" or type is None: self.profiler_collector.clear_perf_profiles(id) if type is None: # Clear both perf and memory profiles self.profiler_collector.clear_memory_profiles(id) else: raise PySparkValueError( errorClass="VALUE_NOT_ALLOWED", messageParameters={ "arg_name": "type", "allowed_values": str(["perf", "memory"]), }, ) def _render_flameprof(stats: pstats.Stats) -> Any: try: from flameprof import render except ImportError: raise PySparkValueError( errorClass="PACKAGE_NOT_INSTALLED", messageParameters={"package_name": "flameprof", "minimum_version": "0.4"}, ) buf = StringIO() render(stats.stats, buf) # type: ignore[attr-defined] svg = buf.getvalue() try: import IPython ipython = IPython.get_ipython() except ImportError: ipython = None if ipython: from IPython.display import HTML return HTML(svg) else: return svg _renderers: Dict[ Tuple[str, Optional[str]], Union[Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]] ] = { ("perf", None): _render_flameprof, ("perf", "flameprof"): _render_flameprof, }