Source code for autowire.helpers
"""
autowire.helpers
================
Helpers.
"""
import contextlib
import functools
from autowire.base import BaseContext, BaseResource
from .builtins import this
from .utils import RefCounter
[docs]def autowired(fn, *dependencies: BaseResource):
"""
Convert plain contextmanager to resource implementation by autowiring
dependencies. ::
foo = Resource('foo', __name__)
bar = Resource('bar', __name__)
@contextlib.contextmanager
def with_foo(bar):
value = baz(bar)
try:
yield value
finally:
value.teardown()
impl = autowired(with_foo, bar) # Convert to implementation.
foo.impl(impl)
"""
@contextlib.contextmanager
def implementation(context: BaseContext):
# Resolve dependencies using context.
def with_dependencies(fn, dependencies):
if dependencies:
first = dependencies[0]
rest = dependencies[1:]
with context.resolve(first) as value:
fn = functools.partial(fn, value)
yield from with_dependencies(fn, rest)
else:
with fn() as value:
yield value
# Call implemenation with autowired resources.
yield from with_dependencies(fn, dependencies)
return implementation
[docs]def shared(impl):
"""
Convert implementation to shared resource implementation.
Shared resource shares contexxt using reference counting.
When you try to resolve shared resource that already resolve but not teared
down, It brings previously resolved resouce. ::
from contextlib import contextmanager
from autowire import Context, Resource
from autowire.decorators import shared
dog = Resource('dog', __name__)
walk = Resource('walk', __name__)
@dog.impl
@shared
@contextmanager
def with_dog(context):
print("Dog is entering")
try:
yield "🐶"
finally:
print("Dog leaved")
@walk.impl
@contextmanager
def with_walking(context):
with context.resolve(dog) as dog_value:
yield "Walking with {}".format(dog_value)
context = Context()
with context.resolve(walk) as message:
print(message)
with context.resolve(dog) as dog_value:
print("Feeding {}".format(dog_value))
# Output:
# Dog is entering
# Walking with 🐶
# Feeding 🐶
# Dog leaved
"""
counters = {}
@functools.wraps(impl)
@contextlib.contextmanager
def wrapper(context: BaseContext):
if context not in counters:
counters[context] = RefCounter(impl(context))
counter = counters[context]
try:
with counter as value:
yield value
finally:
if counter.count == 0:
del counters[context]
return wrapper
[docs]def globally_shared(impl):
"""
Convert implementation to globally shared resource implementation.
Which "globally shared" means resource is shared in providing context.
Shared resource shares contexxt using reference counting.
When you try to resolve shared resource that already resolve but not teared
down, It brings previously resolved resouce.
The resource will be tied into resource's providing context.
It means that children's context will not be able to be used
in this resource. ::
res1 = Resource('res1', __name__)
res2 = Resource('res2', __name__)
@res2.impl
@globally_shared
@contextlib.contextmanager
def with_res1(context):
with context.resolve(res2) as res1_factory:
res1 = res1_factory.create_res1()
try:
yield res1
finally:
res1.teardown()
context = Context()
child = Context(context)
@child.autowired(res2)
def with_res2():
yield ...
# Get ResourceNotProvidedError, even though child provides res2.
# Because res1 was shared in parent context.
with child.resolve(res1):
...
"""
counter = None
@functools.wraps(impl)
@contextlib.contextmanager
def wrapper(context: BaseContext):
nonlocal counter
with context.resolve(this) as resource:
if counter is None:
provider = context.provided_by(resource) # Find me
counter = RefCounter(impl(provider))
try:
with counter as value:
yield value
finally:
if counter.count == 0:
counter = None
return wrapper