Skip to content

Chain of responsibility

Chain of responsibility is a pattern that describes how to create a chain of objects that are able to handle a specific request. Such a request is usually sent by the client and then goes to the first element of the chain. If the first element is able to properly handle the request, the chain exits, which also implies no "contact" with the rest of the chain (often called "handlers"). On the other hand, if an element of the chain is not able to handle the request, then - unless it is the last element of the chain - it tries to shift the responsibility of handling it to the next element.

Design

In order to create the chain described above, we need the following elements:

  • a common interface that represents the handler that at a given element of the chain may attempt to service the request
  • various implementations of handlers, usually their number implies the number of elements in the chain
  • a class representing an element of the chain, i.e. one using a specific handler implementation and pointing to the next element.


cor

Example

The example below shows the use of the Chain of responsibility pattern. The sent request is an attempt to log in by the user. We have three authentication methods:

  • with a username and password
  • using the Bearer token
  • using the AWS token

All three implementations are simplified for the sake of the example.

The implementation consists of the following elements:

  • Credentials which is the interface representing the data to log in. It has three implementations:
    • AwsSignature
    • BearerToken
    • UsernameAndPasswordCredentials
  • AuthenticationHandler rrepresenting the common handler interface. It has three implementations:
    • AwsAuthenticationHandler
    • BearerTokenAuthenticationHandler
    • UsernameAndPasswordAuthenticationHandler
  • ChainAuthenticationElement being part of the chain. It stores information about the available handler, the next element, and has authentication logic.
  • usage example builds a string from three elements, and then tries to log in with available Credentials objects (i.e. uses the created string).
class Credentials:
    def get_credentials(self, user_id):
        pass
class AWSSignature(Credentials):
    def get_credentials(self, user_id):
        return "98f92d42-66c7-4ce4-a834-087a783133e7"
class BearerToken(Credentials):
    def get_credentials(self, user_id):
        return "1/mZ1edKKACtPAb7zGlwSzvs72PvhAbGmB8K1ZrGxpcNM"
class UserNameAndPasswordCredentials(Credentials):
    def get_credentials(self, user_id):
        return "andrew:Andrew_123"
class AuthenticationHandler:
    def authenticate(self, credentials):
        pass

    def supports(self, cls):
        pass
class AWSAuthenticationHandler(AuthenticationHandler):
    def authenticate(self, credentials):
        if self.supports(credentials):
            return self.authenticate_in_aws(credentials)
        else:
            return False

    def supports(self, cls):
        return isinstance(cls, AWSSignature)

    def authenticate_in_aws(self, credentials):
        return random.randint(1, 3) == 1
class BearerTokenAuthenticationHandler(AuthenticationHandler):
    def authenticate(self, credentials):
        if self.supports(credentials):
            return self.is_token_valid(credentials)
        else:
            return False

    def supports(self, cls):
        return isinstance(cls, BearerToken)

    def is_token_valid(self, credentials):
        return random.randint(1, 3) == 2
class UserNameAndPasswordAuthenticationHandler(AuthenticationHandler):
    def authenticate(self, credentials):
        if self.supports(credentials):
            return self.is_password_valid(credentials)
        else:
            return False

    def supports(self, cls):
        return isinstance(cls, UserNameAndPasswordCredentials)

    def is_password_valid(self, credentials):
        return random.randint(1, 3) == 3
class ChainAuthenticationElement:
    def __init__(self, authentication_handler, next = None):
        self._authentication_handler = authentication_handler
        self._next = next

    def authenticate(self, credentials):
        if self._authentication_handler.authenticate(credentials):
            print(f"Authentication using handler {credentials.__class__.__name__}")
            return True
        else:
            return self._next and self._next.authenticate(credentials)
def main():
    authentication_handler_unp = UserNameAndPasswordAuthenticationHandler()
    authentication_handler_bearer = BearerTokenAuthenticationHandler()
    authentication_handler_aws = AWSAuthenticationHandler()

    last_element = ChainAuthenticationElement(authentication_handler_aws)
    middle_element = ChainAuthenticationElement(authentication_handler_bearer, last_element)
    first_element = ChainAuthenticationElement(authentication_handler_unp, middle_element)

    first_element.authenticate(AWSSignature())
    first_element.authenticate(UserNameAndPasswordCredentials())
    first_element.authenticate(BearerToken())


if __name__ == '__main__':
    main()