Module livekit.agents.ipc.proc_pool

Classes

class ProcPool (*,
initialize_process_fnc: Callable[[JobProcess], Any],
job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]],
num_idle_processes: int,
initialize_timeout: float,
close_timeout: float,
inference_executor: inference_executor.InferenceExecutor | None,
job_executor_type: JobExecutorType,
mp_ctx: BaseContext,
memory_warn_mb: float,
memory_limit_mb: float,
http_proxy: str | None,
loop: asyncio.AbstractEventLoop)
Expand source code
class ProcPool(utils.EventEmitter[EventTypes]):
    def __init__(
        self,
        *,
        initialize_process_fnc: Callable[[JobProcess], Any],
        job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]],
        num_idle_processes: int,
        initialize_timeout: float,
        close_timeout: float,
        inference_executor: inference_executor.InferenceExecutor | None,
        job_executor_type: JobExecutorType,
        mp_ctx: BaseContext,
        memory_warn_mb: float,
        memory_limit_mb: float,
        http_proxy: str | None,
        loop: asyncio.AbstractEventLoop,
    ) -> None:
        super().__init__()
        self._job_executor_type = job_executor_type
        self._mp_ctx = mp_ctx
        self._initialize_process_fnc = initialize_process_fnc
        self._job_entrypoint_fnc = job_entrypoint_fnc
        self._close_timeout = close_timeout
        self._inf_executor = inference_executor
        self._initialize_timeout = initialize_timeout
        self._loop = loop
        self._memory_limit_mb = memory_limit_mb
        self._memory_warn_mb = memory_warn_mb
        self._default_num_idle_processes = num_idle_processes
        self._http_proxy = http_proxy
        self._target_idle_processes = num_idle_processes

        self._init_sem = asyncio.Semaphore(MAX_CONCURRENT_INITIALIZATIONS)
        self._warmed_proc_queue = asyncio.Queue[JobExecutor]()
        self._executors: list[JobExecutor] = []
        self._spawn_tasks: set[asyncio.Task[None]] = set()
        self._monitor_tasks: set[asyncio.Task[None]] = set()
        self._started = False
        self._closed = False

        self._idle_ready = asyncio.Event()
        self._jobs_waiting_for_process = 0

    @property
    def processes(self) -> list[JobExecutor]:
        return self._executors

    def get_by_job_id(self, job_id: str) -> JobExecutor | None:
        return next(
            (x for x in self._executors if x.running_job and x.running_job.job.id == job_id),
            None,
        )

    async def start(self) -> None:
        if self._started:
            return

        self._started = True
        self._main_atask = asyncio.create_task(self._main_task())

        if self._default_num_idle_processes > 0:
            # wait for the idle processes to be warmed up (by the main task)
            await self._idle_ready.wait()

    async def aclose(self) -> None:
        if not self._started:
            return

        self._closed = True
        await aio.cancel_and_wait(self._main_atask)

    async def launch_job(self, info: RunningJobInfo) -> None:
        self._jobs_waiting_for_process += 1
        if (
            self._warmed_proc_queue.empty()
            and len(self._spawn_tasks) < self._jobs_waiting_for_process
        ):
            # spawn a new process if there are no idle processes
            task = asyncio.create_task(self._proc_spawn_task())
            self._spawn_tasks.add(task)
            task.add_done_callback(self._spawn_tasks.discard)

        proc = await self._warmed_proc_queue.get()
        self._jobs_waiting_for_process -= 1

        await proc.launch_job(info)
        self.emit("process_job_launched", proc)

    def set_target_idle_processes(self, num_idle_processes: int) -> None:
        self._target_idle_processes = num_idle_processes

    @property
    def target_idle_processes(self) -> int:
        return self._target_idle_processes

    @utils.log_exceptions(logger=logger)
    async def _proc_spawn_task(self) -> None:
        proc: JobExecutor
        if self._job_executor_type == JobExecutorType.THREAD:
            proc = job_thread_executor.ThreadJobExecutor(
                initialize_process_fnc=self._initialize_process_fnc,
                job_entrypoint_fnc=self._job_entrypoint_fnc,
                initialize_timeout=self._initialize_timeout,
                close_timeout=self._close_timeout,
                inference_executor=self._inf_executor,
                ping_interval=2.5,
                high_ping_threshold=0.5,
                http_proxy=self._http_proxy,
                loop=self._loop,
            )
        elif self._job_executor_type == JobExecutorType.PROCESS:
            proc = job_proc_executor.ProcJobExecutor(
                initialize_process_fnc=self._initialize_process_fnc,
                job_entrypoint_fnc=self._job_entrypoint_fnc,
                initialize_timeout=self._initialize_timeout,
                close_timeout=self._close_timeout,
                inference_executor=self._inf_executor,
                mp_ctx=self._mp_ctx,
                loop=self._loop,
                ping_interval=2.5,
                ping_timeout=60,
                high_ping_threshold=0.5,
                memory_warn_mb=self._memory_warn_mb,
                memory_limit_mb=self._memory_limit_mb,
                http_proxy=self._http_proxy,
            )
        else:
            raise ValueError(f"unsupported job executor: {self._job_executor_type}")

        self._executors.append(proc)
        async with self._init_sem:
            if self._closed:
                self._executors.remove(proc)
                return

            self.emit("process_created", proc)
            await proc.start()
            self.emit("process_started", proc)
            try:
                await proc.initialize()
                # process where initialization times out will never fire "process_ready"
                # neither be used to launch jobs

                self.emit("process_ready", proc)
                self._warmed_proc_queue.put_nowait(proc)
                if self._warmed_proc_queue.qsize() >= self._default_num_idle_processes:
                    self._idle_ready.set()
            except Exception:
                logger.exception("error initializing process", extra=proc.logging_extra())

        monitor_task = asyncio.create_task(self._monitor_process_task(proc))
        self._monitor_tasks.add(monitor_task)
        monitor_task.add_done_callback(self._monitor_tasks.discard)

    @utils.log_exceptions(logger=logger)
    async def _monitor_process_task(self, proc: JobExecutor) -> None:
        try:
            await proc.join()
            self.emit("process_closed", proc)
        finally:
            self._executors.remove(proc)

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        try:
            while not self._closed:
                current_pending = self._warmed_proc_queue.qsize() + len(self._spawn_tasks)
                to_spawn = (
                    min(self._target_idle_processes, self._default_num_idle_processes)
                    - current_pending
                )

                for _ in range(to_spawn):
                    task = asyncio.create_task(self._proc_spawn_task())
                    self._spawn_tasks.add(task)
                    task.add_done_callback(self._spawn_tasks.discard)

                await asyncio.sleep(0.1)
        except asyncio.CancelledError:
            await asyncio.gather(*[proc.aclose() for proc in self._executors])
            await asyncio.gather(*self._spawn_tasks)
            await asyncio.gather(*self._monitor_tasks)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Ancestors

Instance variables

prop processes : list[JobExecutor]
Expand source code
@property
def processes(self) -> list[JobExecutor]:
    return self._executors
prop target_idle_processes : int
Expand source code
@property
def target_idle_processes(self) -> int:
    return self._target_idle_processes

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if not self._started:
        return

    self._closed = True
    await aio.cancel_and_wait(self._main_atask)
def get_by_job_id(self, job_id: str) ‑> JobExecutor | None
Expand source code
def get_by_job_id(self, job_id: str) -> JobExecutor | None:
    return next(
        (x for x in self._executors if x.running_job and x.running_job.job.id == job_id),
        None,
    )
async def launch_job(self, info: RunningJobInfo) ‑> None
Expand source code
async def launch_job(self, info: RunningJobInfo) -> None:
    self._jobs_waiting_for_process += 1
    if (
        self._warmed_proc_queue.empty()
        and len(self._spawn_tasks) < self._jobs_waiting_for_process
    ):
        # spawn a new process if there are no idle processes
        task = asyncio.create_task(self._proc_spawn_task())
        self._spawn_tasks.add(task)
        task.add_done_callback(self._spawn_tasks.discard)

    proc = await self._warmed_proc_queue.get()
    self._jobs_waiting_for_process -= 1

    await proc.launch_job(info)
    self.emit("process_job_launched", proc)
def set_target_idle_processes(self, num_idle_processes: int) ‑> None
Expand source code
def set_target_idle_processes(self, num_idle_processes: int) -> None:
    self._target_idle_processes = num_idle_processes
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    if self._started:
        return

    self._started = True
    self._main_atask = asyncio.create_task(self._main_task())

    if self._default_num_idle_processes > 0:
        # wait for the idle processes to be warmed up (by the main task)
        await self._idle_ready.wait()

Inherited members