Zero Downtime API Shared Secret Rotation

April 29, 2023 - 15 minute read -
secret rotation zero down time shared secret

Zero Downtime API Shared Secret Rotation

Secrets are used in software applications for many different things from connecting to a database to signing a JWT passed between services. It is considered a security best practice to rotate these types of secrets on a regular basis. Some reasons for this are:

  • If a secret is accidentally exposed in logs, accidentally committed to version control, etc., having a process in place which smoothly rotates a secret turns what would be a firedrill into much less risky operation
  • Rotating secrets regularly helps to ensure that if a malicious insider acquired a secret, that the time it would be viable would be minimized

It is common that applications will have secrets provided to them through configuration or the environment and the secret will rarely change if ever. Some secrets are harder to rotate than others but with a little planning, the tools exist to make secret rotation possible and possibly even with zero downtime. AWS provides some good documentation on retrieving secrets from Secrets Manager and Parameter Store as well as some examples of dynamic credential rotation. We can use this information to put together a solution for how we can rotate credentials within our applications. Lets get started.

A Shared Secret

Lets suppose we have application A and application B. In the spirit of zero-trust we aren’t going to just let the applications trust the network and communicate without any form authentication or authorization. Instead, the services will utilize a shared secret. This could be used for a JWT HMAC or simply a token used for bearer authentication. To keep things simple, we’ll use bearer authentication in this demonstration.

Storing and Sharing the Secret

For this example we will use AWS SSM Parameter Store for storing the shared secret. This could also be AWS Secrets Manager, GCP Secret Manager, Azure Key Vault, etc. The important thing is that the secret is stored centrally in a secrets manager and the applications have the appropriate privileges so that they can access it.

NOTE: AWS Parameter Store and Secrets manager each have their own pluses and minuses so read up on the differences to select the one which fits your use case best.

We’ll use the SecureString type when configuring the parameter to ensure that the secret is encrypted with KMS and we set the secret to MyInitialSecret.

Initial AWS SSM Parameter

The API

The first thing we will do is create a small Python Flask api that has an endpoint protected with bearer authentication. The secret is acquired from the environment the way most applications typically would:

import os
from flask import Flask
from flask_httpauth import HTTPTokenAuth

app = Flask(__name__)
auth = HTTPTokenAuth(scheme='Bearer')

@auth.verify_token
def verify_token(token):
    if token == os.getenv('SECRET_TOKEN'):
        return True

@app.route('/')
@auth.login_required
def main():
    return 'Open Sesame!'

Next, we’ll create a SecretsCache class that:

  • Has a method to pull / refresh credentials from AWS Parameter Store
  • Has a method to get a secret from the object
  • Acquires AWS parameter store credentials when the object is instantiated
  • Has a ttl (time to live) feature to automatically refresh secrets when they become stale
  • Leverages “current” and “previous” versions of secrets to allow for rolling updates
import boto3
from datetime import datetime, timedelta

class SecretsCache:
    # Acquire the secrets from AWS when the object is instantiated
    def __init__(self, keys, region, ttl=1):
        # Initialize a dictionary for each secret key name
        self.secrets    = {}
        for key in keys:
            self.secrets[key] = {}

        self.region     = region
        # Set a 1 minute ttl on refreshing secrets
        self.ttl        = ttl
        self.updated_at = None

        # Fetch the secrets
        self.refresh()

    # Pull the secrets from Parameter Store
    def refresh(self):
        print("Retrieving secrets from Parameter Store")

        # Get current and previous versions of each secret
        for key in self.secrets:
            self.secrets[key] = self.__get_secret_versions(key)

        # Set the updated_at property to now
        self.updated_at = datetime.now()

    # Method to get a secret from the object
    # Default to getting the Current secret
    def secret(self, key, version='Current'):
        # Check if our secrets need a refresh on access
        if self.__stale_secrets():
            self.refresh()

        return self.secrets[key][version]

    def __stale_secrets(self):
        if datetime.now() > self.updated_at + timedelta(minutes = self.ttl):
            print("TTL expired!")
            return True
        else:
            return False

    # Looks up current and previous version for a secret
    def __get_secret_versions(self, key):
        values = {'Current': None, 'Previous': None}

        ssm = boto3.client('ssm', self.region)

        response = ssm.get_parameter(
                Name=key,WithDecryption=True
        )

        if response and response['Parameter']['Value']:
            values['Current'] = response['Parameter']['Value']

            # Parameter store increments the version each time a secret is
            # updated and it starts with 1 for the first secret
            # Because of this we only lookup a previous version if the secret is
            # the second version or more
            if response['Parameter']['Version'] > 1:
                response = ssm.get_parameter(
                        Name=f'{key}:{(response["Parameter"]["Version"]-1)} ',
                        WithDecryption=True
                )

                if response and response['Parameter']['Value']:
                    values['Previous'] = response['Parameter']['Value']

        return values

The reason for the ttl in the class is that if the server is never rebooted it will never know that a secret has been rotated. It needs something to wake it up and refresh to secrets. The reason for fetching and using current and previous versions of a secret is that in a production environment not all clients and servers will necessarily recive updated credentials at the same time. By having both previous and current secrets the server can support a client that hasn’t received the updated secret yet.

Now, we’ll update the api code from before to instantiate an instance of this class when the app boots up to make the token comparison to the secret stored in the object instead of from the environment. We’ll also update the token validation to try the current secret, try the previous secret, and if both fail, refresh and retry the current secret:

.. snip ..
import secretscache as sc

# Initialize some globals
api_token_key = 'RotationExample'
aws_region    = 'us-east-1'

app = Flask(__name__)
auth = HTTPTokenAuth(scheme='Bearer')

# Load up our secrets on boot
s = sc.SecretsCache([api_token_key], aws_region)

@auth.verify_token
def verify_token(token):
    print(f'Using token: {token} for demonstration purposes only. Don\'t do this in production!')
    # Try the most current secret we have in memory
    if token == s.secret(api_token_key):
        print(f'Current token match!')
        return True
    elif token == s.secret(api_token_key, 'Previous'):
        print(f'Previous token match!')
        return True
    else:
        # If current and previous fail, go refresh secrets from Parameter Store
        # Retry current which may now be new if it was updated
        print(f'Token mis-match. Refreshing token')
        s.refresh()
        if token == s.secret(api_token_key):
            return True
    return False

.. snip ..

The reason after we try current and previous secrets we refresh and retry current again is it could be possible that a client has received an updated secret before the server did. In that case neither the current or previous secrets would match the new secret the client holds. By refreshing the secrets on the server at this point, the server should receive the newly updated secret which becomes the new current and the request should succeed.

Putting it all together, the final code looks like this:

secretscache.py

import boto3
from datetime import datetime, timedelta

class SecretsCache:
    # Acquire the secrets from AWS when the object is instantiated
    def __init__(self, keys, region, ttl=1):
        # Initialize a dictionary for each secret key name
        self.secrets    = {}
        for key in keys:
            self.secrets[key] = {}

        self.region     = region
        # Set a 1 minute ttl on refreshing secrets
        self.ttl        = ttl
        self.updated_at = None

        # Fetch the secrets
        self.refresh()

    # Pull the secrets from Parameter Store when object instantiated
    def refresh(self):
        print("Retrieving secrets from Parameter Store")

        # Get current and previous versions of each secret
        for key in self.secrets:
            self.secrets[key] = self.__get_secret_versions(key)

        # Set the updated_at property to now
        self.updated_at = datetime.now()

    # Method to get a secret from the object
    # Default to getting the Current secret
    def secret(self, key, version='Current'):
        # Check if our secrets need a refresh on access
        if self.__stale_secrets():
            self.refresh()

        return self.secrets[key][version]

    def __stale_secrets(self):
        if datetime.now() > self.updated_at + timedelta(minutes = self.ttl):
            print("TTL expired!")
            return True
        else:
            return False

    # Looks up current and previous version for a secret
    def __get_secret_versions(self, key):
        values = {'Current': None, 'Previous': None}

        ssm = boto3.client('ssm', self.region)

        response = ssm.get_parameter(
                Name=key,WithDecryption=True
        )

        if response and response['Parameter']['Value']:
            values['Current'] = response['Parameter']['Value']

            # Parameter store increments the version each time a secret is
            # updated and it starts with 1 for the first secret
            # Because of this we only lookup a previous version if the secret is
            # the second version or more
            if response['Parameter']['Version'] > 1:
                response = ssm.get_parameter(
                        Name=f'{key}:{(response["Parameter"]["Version"]-1)} ',
                        WithDecryption=True
                )

                if response and response['Parameter']['Value']:
                    values['Previous'] = response['Parameter']['Value']

        return values

server.py:

from flask import Flask
from flask_httpauth import HTTPTokenAuth
import secretscache as sc

# Initialize some globals
api_token_key = 'RotationExample'
aws_region    = 'us-east-1'

# Initialize a Flask app 
app = Flask(__name__)
auth = HTTPTokenAuth(scheme='Bearer')

# Load up our secrets on boot
s = sc.SecretsCache([api_token_key], aws_region)

@auth.verify_token
def verify_token(token):
    print(f'Using token: {token} for demonstration purposes only. Don\'t do this in production!')
    # Try the most current secret we have in memory
    if token == s.secret(api_token_key):
        print(f'Current token match!')
        return True
    elif token == s.secret(api_token_key, 'Previous'):
        print(f'Previous token match!')
        return True
    else:
        # If current and previous fail, go refresh secrets from Parameter Store
        # Retry current which may now be new if it was updated
        print(f'Token mis-match. Refreshing token')
        s.refresh()
        if token == s.secret(api_token_key):
            return True
    return False

@app.route('/')
@auth.login_required
def main():
    return 'Open Sesame!'

Alright! Lets give it a shot! Let’s boot up our Flask api:

export FLASK_APP=server
export FLASK_ENV=development

╰─❯ flask run
Retrieving secrets from Parameter Store
 * Serving Flask app 'server'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://127.0.0.1:5000
Press CTRL+C to quit

In the output we see the “Retrieving secrets from Parameter Store” output so presumeabley the secrets are pulled correctly when the app boots. Now we’ll use curl to make a call to our api with the initial secret we set of MyInitialSecret to test things out:

curl -H 'Authorization: Bearer MyInitialSecret' http://127.0.0.1:5000
Open Sesame!%

The call succeeds as expected as the app pulled the secret when it booted and it matches the secret we send in the authorization header. In the output on the server side we see:

Using token: MyInitialSecret for demonstration purposes only. Don't do this in production!
Current token match!

Now for the magic. We will go to Parameter Store and update the secret to a new value of MyNewSecret:

Updated AWS SSM Parameter

And without restarting our server (because that would be cheating ;)) we use curl again with the updated secret:

╰─❯ curl -H 'Authorization: Bearer MyNewSecret' http://127.0.0.1:5000
OpenSesame!%

The request succeeds with the new secret! Lets look at the server logs. We see:

Using token: MyNewSecret for demonstration purposes only. Don't do this in production!
TTL expired!
Retrieving secrets from Parameter Store
Previous token match!
127.0.0.1 - - [29/Apr/2023 18:01:00] "GET / HTTP/1.1" 200 -

The output may differ depending on if the secret is rotated and curl executed before the ttl has expired or not. In this case the ttl had expired so when the new secret was tried the server pulled the new secrets and client secret against the refreshed secret which match so the request succeeds.

And just to prove we don’t have anything up our sleeve, we’ll make another request with a bogus secret:

╰─❯ curl -H 'Authorization: Bearer FakeSecret' http://127.0.0.1:5000
Unauthorized Access%

And as we would expect, we are denied access.

What About The Client

You might wonder what the client side code might look like to support the secret rotation. In actuality the client side code could simply use a class like the SecretsCache class as is. For example:

client.py

import requests
import secretscache as sc
import time

# Initialize some globals
api_token_key = 'RotationExample'
aws_region    = 'us-east-1'

api_endpoint='http://localhost:5000/'
s = sc.SecretsCache([api_token_key], aws_region)

# Loop forever
while True:
    print(f'Trying to access api using secret: {s.secret(api_token_key)}. For demonstration purposes only. Don\'t log secrets!')
    r = requests.get(api_endpoint, headers={"Authorization": f'Bearer {s.secret(api_token_key)}'})
    print(f'Request was {r.status_code}')
    time.sleep(5)

if __name__ == '__main__':
    sys.exit(main())

Here we have a simple Python script that makes a request in a loop every 5 seconds. The script leverages our SecretsCache class again but needs no other special code to support the secret rotation. The class has the ttl logic built into it so it will refresh the secrets every so often. The client doesn’t need to be aware of current and previous versions so long as the server handles it, which it is doing in our example. We can start up our server and client and rotate the secret in AWS and when the ttl expires, we can see the magic happen:

In the client console when we rotate the secret, we see:

.. snip ..

Trying to access api using secret: Abracadabra. For demonstration purposes only. Don't log secrets!
Request was 200
Trying to access api using secret: Abracadabra. For demonstration purposes only. Don't log secrets!
Request was 200
TTL expired!
Retrieving secrets from Parameter Store
Trying to access api using secret: AlaKazaam!. For demonstration purposes only. Don't log secrets!
Request was 200
Trying to access api using secret: AlaKazaam!. For demonstration purposes only. Don't log secrets!
Request was 200

.. snip ..

And in the server console we see:

.. snip ..

Current token match!
127.0.0.1 - - [06/May/2023 21:08:57] "GET / HTTP/1.1" 200 -
Using token: Abracadabra for demonstration purposes only. Don't do this in production!
Current token match!
127.0.0.1 - - [06/May/2023 21:09:02] "GET / HTTP/1.1" 200 -
Using token: Abracadabra for demonstration purposes only. Don't do this in production!
TTL expired!
Retrieving secrets from Parameter Store
Previous token match!
127.0.0.1 - - [06/May/2023 21:09:07] "GET / HTTP/1.1" 200 -
Using token: AlaKazaam! for demonstration purposes only. Don't do this in production!
Current token match!
127.0.0.1 - - [06/May/2023 21:09:13] "GET / HTTP/1.1" 200 -
Using token: AlaKazaam! for demonstration purposes only. Don't do this in production!
Current token match!

.. snip ..

We can see that after rotation the secret from Abracadabra to AlaKazaam!, when the ttl expires on the client, the new secret is picked up and tried. On the server side, the ttl has expired so the new secret is picked up and compared with AlaKazaam! which matches so the request suceeds.

Conclusion

I hope this post has helped to make secret rotation a little less intimidating and more approachable for people. TheSecretsCache class described in the post can easily be modified to work with AWS Secrets Manager or some other cloud providers secrets management service. In our example code we have used a very small ttl default of 1 minute but you likely wouldn’t need to refresh the secrets this frequently. This was simply done to speed up my demonstration. Somewhere between 10 - 60 minutes might be a more reasonable default to allow you to rotate the secret in case of exposure and have it rolled over on all servers in a reasonable amount of time. The sample code from the post can be found over on my GitHub Happy secret rotating!