Gate access to a middleware.
The gate callback will be called with the current message.
Add the middleware to your bus.
def gate(message: banshee.Message[typing.Any]) -> bool: return isinstance(message.request, FooMixin) bus = ( banshee.builder() .with_middleware(banshee.GateMiddleware(FooHandler(), gate)) .with_locator(registry) .build() )
- class banshee.Gate(*args, **kwargs)
Check whether the decorated middleware should be used.
- class banshee.GateMiddleware(inner, gate)
Middlewaredecorator to conditionally invoke the decorated middleware depending on the result of a gate callback.
When the callback passes, decorated middleware will be inserted next in the chain before processing continues.
- async __call__(message, handle)
Forward the message to the inner middleware or the next handler in the chain based on the check callback result.