Memray Profiling

Name:flytekitplugins-memray
Version:0.0.0+develop
Author:admin@flyte.org
Provides: flytekitplugins.memray
Requires: flytekit>=1.12.0
memray
Python:>=3.9
License:apache2
Source Code: https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-memray
  • Intended Audience :: Science/Research
  • Intended Audience :: Developers
  • License :: OSI Approved :: Apache Software License
  • Programming Language :: Python :: 3.9
  • Programming Language :: Python :: 3.10
  • Programming Language :: Python :: 3.11
  • Programming Language :: Python :: 3.12
  • Topic :: Scientific/Engineering
  • Topic :: Scientific/Engineering :: Artificial Intelligence
  • Topic :: Software Development
  • Topic :: Software Development :: Libraries
  • Topic :: Software Development :: Libraries :: Python Modules

Memray tracks and reports memory allocations, both in python code and in compiled extension modules. This Memray Profiling plugin enables memory tracking on the Flyte task level and renders a memgraph profiling graph on Flyte Deck.

To install the plugin, run the following command:

pip install flytekitplugins-memray

Example

from flytekit import workflow, task, ImageSpec
from flytekitplugins.memray import memray_profiling
import time


image = ImageSpec(
    name="memray_demo",
    packages=["flytekitplugins_memray"],
    registry="<your_cr_registry>",
)


def generate_data(n: int):
    leak_list = []
    for _ in range(n):  # Arbitrary large number for demonstration
        large_data = " " * 10**6  # 1 MB string
        leak_list.append(large_data)  # Keeps appending without releasing
        time.sleep(0.1)  # Slow down the loop to observe memory changes


@task(container_image=image, enable_deck=True)
@memray_profiling(memray_html_reporter="table")
def memory_usage(n: int) -> str:
    generate_data(n=n)

    return "Well"


@task(container_image=image, enable_deck=True)
@memray_profiling(trace_python_allocators=True, memray_reporter_args=["--leaks"])
def memory_leakage(n: int) -> str:
    generate_data(n=n)

    return "Well"


@workflow
def wf(n: int = 500):
    memory_usage(n=n)
    memory_leakage(n=n)