Code Compilation

Internally when Context.resolve is called aioinject compiled whole dependency graph into a single function and then uses it to resolve that specific type.

For example given a setup like this

import asyncio
import contextlib
from collections.abc import AsyncIterator
from datetime import datetime
from typing import NewType

from aioinject import (
    Container,
    Object,
    Scoped,
    Singleton,
    Transient,
)


class SingletonClient:
    pass


class DBConnection:
    pass


@contextlib.asynccontextmanager
async def setup_db_connection() -> AsyncIterator[DBConnection]:
    yield DBConnection()


Now = NewType("Now", datetime)


class Service:
    def __init__(
        self,
        now_a: Now,
        now_b: Now,
        int_object: int,
        connection: DBConnection,
        client: SingletonClient,
    ) -> None:
        self._now_a = now_a
        self._now_b = now_b
        self._int = int_object
        self._connection = connection
        self._client = client


async def main() -> None:
    container = Container()
    container.register(
        Singleton(SingletonClient),
        Scoped(setup_db_connection),
        Object(42),
        Transient(lambda: datetime.now(), interface=Now),  # noqa: DTZ005
        Scoped(Service),
    )

    async with container, container.context() as context:
        await context.resolve(Service)


if __name__ == "__main__":
    asyncio.run(main())

Generated factory function would look like this:

async def factory(scopes: "Mapping[BaseScope, Context]") -> "T":
    lifetime_scope_cache = scopes[lifetime_scope].cache # (1)!
    lifetime_scope_exit_stack = scopes[lifetime_scope].exit_stack
    request_scope_cache = scopes[request_scope].cache
    request_scope_exit_stack = scopes[request_scope].exit_stack

    Service_now_a_Now_instance = Service_now_a_Now_provider.provide({})
    Service_now_b_Now_instance = Service_now_b_Now_provider.provide({})

    int_instance = int_provider.provide({})

    if ( # (2)!
        DBConnection_instance := request_scope_cache.get(
            DBConnection_type, NotInCache
        )
    ) is NotInCache:
        DBConnection_instance = (
            await request_scope_exit_stack.enter_async_context(
                DBConnection_provider.provide({})
            )
        )
        request_scope_cache[DBConnection_type] = DBConnection_instance # (3)!

    if (
        SingletonClient_instance := lifetime_scope_cache.get(
            SingletonClient_type, NotInCache
        )
    ) is NotInCache:
        async with scopes[lifetime_scope].lock: # (4)!
            if (
                SingletonClient_instance := lifetime_scope_cache.get(
                    SingletonClient_type, NotInCache
                )
            ) is NotInCache:
                SingletonClient_instance = SingletonClient_provider.provide({})
                lifetime_scope_cache[SingletonClient_type] = (
                    SingletonClient_instance
                )

    if (
        Service_instance := request_scope_cache.get(Service_type, NotInCache)
    ) is NotInCache:
        Service_instance = Service_provider.provide(
            {
                "now_a": Service_now_a_Now_instance,
                "now_b": Service_now_b_Now_instance,
                "int_object": int_instance,
                "connection": DBConnection_instance,
                "client": SingletonClient_instance,
            }
        )
        request_scope_cache[Service_type] = Service_instance

    return Service_instance
  1. Used scope variables are set up
  2. Relevant scope's cache is checked to see if dependency was already provided before
  3. Provided instance is cached
  4. Concurrent-sensitive providers are resolved under lock, also double-checked locking is used

Note

Usually object id is appended to variable name (e.g. DBConnection_140734497381936) to avoid name conflicts, here they're cleaned up.