Skip to main content

How to extend OPAL with custom Fetch Providers

This tutorial will explain how to write and use your own custom fetch providers, so that OPAL can pull state from a custom service into the authorization layer (i.e: OPA).

note

Before you proceed to implement your own, please check out the list of the available Fetch Providers we already have in place.

The guide has 3 main parts:

  1. Background - explains why we need custom fetch providers, gives examples for use cases and explains what fetch providers are.
  2. Writing your own fetch provider - step-by-step explanation how to write your own fetch provider.
  3. Using a custom fetch provider - given a custom fetch provider (either published by someone else or written by you), shows how to use the provider in your own OPAL setup.

TL;DR

This tutorial is long and detailed, but the gist of it is:

  • All Fetch Providers are simply python classes that derive from BaseFetchProvider.
  • You need to implement the fetching logic in _fetch_() and optionally _process_().
  • Once you finish implementing your provider, you can publish it as a pip package. You can then tell OPAL to use it with the configuration env var OPAL_FETCH_PROVIDER_MODULES.
  • We created a well-documented example fetch provider for Postgres SQL. If you prefer to learn from a real code example you can simply clone it and play with it.

Background

One of the core features of OPAL (besides realtime syncing of authorization state) is the ability to aggregate state from multiple data sources into OPA.

Use cases for fetching authorization state from external sources

  1. We might want to allow certain actions only for paying users. In order to know if the user is a paying user, the authorization layer needs to fetch billing data from a 3rd party service (i.e: Stripe).
  2. We might want to allow a customer success rep to impersonate a user belonging to one of our customers for demo purposes. But only if the customer success rep is assigned a ticket in Salesforce.
  3. In our architecture we have a microservice that manages our custom RBAC roles, we want to pull the list roles and their permissions from the roles service into OPA.

What are OPAL fetch providers?

Fetch Providers are the (pluggable) components OPAL uses to fetch data from sources on demand. You can think about each provider as a plugin or a driver that can teach OPAL how to fetch data from a new data-source.

OPAL was designed to be extensible, and you can easily create more fetch providers to enable OPAL to fetch data from your own unique sources (e.g. a SaaS service, a new DB, your own proprietary solution, etc).

Writing your own fetch provider

In this section we will show a step-by-step tutorial how to write an OPAL fetch provider.

We already created a fully-functional fetch provider for Postgres SQL, that you may use if you need to fetch data from postgres. This fetcher is well documented and you can learn from it how to write your own fetch providers. We will also reference code examples from this fetch provider in our tutorial.

Step 1 - creating your project file hierarchy

All Fetch Providers are simply python classes that derive from BaseFetchProvider.

Fetch Providers are loaded into the fetcher-register from a list of python modules specified by the OPAL configuration env var OPAL_FETCH_PROVIDER_MODULES.

In order for OPAL to be able to load your fetch-provider python module (by load we mean import inside python), the module must be installed on your machine. The best way to install python modules is to publish them as a pip package.

Your minimum file tree should look like this:

.
├── LICENSE
├── README.md
├── opal_fetcher_postgres
├── requirements.txt
├── setup.py

It's pretty basic but we'll go through it anyways:

  • LICENSE - an open-source license. OPAL itself uses the Apache 2.0 license.
  • README.md - a readme that describes your package, typically includes instructions how to install and use your fetch provider (recommended).
  • opal_fetcher_postgres - will be probably have a different name in your own fetch-provider, this is the name of the package after it is installed by pip. (In other words, the import inside python will look like this: import opal_fetcher_postgres).
  • requirements.txt - other pip packages required by your fetch-provider. At minimum, you will need the following packages: opal-common and pydantic.
  • setup.py - This file includes instructions how to install your fetch-provider package. You can copy from us.

Step 2 - your provider module (general structure)

Under your module folder, you should typically have two files:

.
├── __init__.py
└── provider.py

The provider.py module should have the following structure:

  • A class inheriting from FetcherConfig - your config class.
  • A class inheriting from FetchEvent - your event class.
  • A class inheriting from BaseFetchProvider - your fetch provider class.

This example code shows the class structure and some comments:

from pydantic import BaseModel, Field

from opal_common.fetcher.fetch_provider import BaseFetchProvider
from opal_common.fetcher.events import FetcherConfig, FetchEvent


class PostgresFetcherConfig(FetcherConfig):
"""
Config for PostgresFetchProvider, inherits from `FetcherConfig`.
* In your own class, you must set the value of the `fetcher` key to be your custom provider class name.
"""
fetcher: str = "PostgresFetchProvider"


class PostgresFetchEvent(FetchEvent):
"""
When writing a custom provider, you must create a custom FetchEvent subclass, just like this class.
In your own class, you must:
* set the value of the `fetcher` key to be your custom provider class name.
* set the type of the `config` key to be your custom config class (the one just above).
"""
fetcher: str = "PostgresFetchProvider"
config: PostgresFetcherConfig = None


class PostgresFetchProvider(BaseFetchProvider):
"""
The fetch-provider logic, must inherit from `BaseFetchProvider`.
"""
...

You may also reference the provider module from the postgres fetcher.

Step 3 - implementing your FetcherConfig and FetchEvent

Each fetch-provider might require specific values that will be passed to it as part of its configuration. The configuration is simply a Pydantic model that must derive from the FetcherConfig class.

Let's analyze the real code of the PostgresFetcherConfig class from the postgres fetcher.

class PostgresConnectionParams(BaseModel):
"""
if one does not want to pass all postgres arguments in the dsn (in OPAL - the url is the dsn),
one can also use this dict to pass specific arguments.
"""
database: Optional[str] = Field(None, description="the database name")
user: Optional[str] = Field(None, description="user name used to authenticate")
password: Optional[str] = Field(None, description="password used to authenticate")
host: Optional[str] = Field(None, description="database host address (defaults to UNIX socket if not provided)")
port: Optional[str] = Field(None, description="connection port number (defaults to 5432 if not provided)")


class PostgresFetcherConfig(FetcherConfig):
"""
Config for PostgresFetchProvider, instance of `FetcherConfig`.

When an OPAL client receives an update, it contains a list of `DataSourceEntry` objects.
Each `DataSourceEntry` has a `config` key - which is usually an instance of a subclass of `FetcherConfig`.

When writing a custom provider, you must:
- derive your class (inherit) from FetcherConfig
- override the `fetcher` key with your fetcher class name
- (optional): add any fields relevant to a data entry of your fetcher.
- In this example: since we pull data from PostgreSQL - we added a `query` key to hold the SQL query.
"""
fetcher: str = "PostgresFetchProvider"
connection_params: Optional[PostgresConnectionParams] = Field(None, description="these params can override or complement parts of the dsn (connection string)")
query: str = Field(..., description="the query to run against postgres in order to fetch the data")
fetch_one: bool = Field(False, description="whether we fetch only one row from the results of the SELECT query")
  • The PostgresConnectionParams class is simply a sub-model of the main pydantic model. you might not need such a structure in your own implementation.
  • The PostgresFetcherConfig is our actual fetcher config class:
    • The fetcher attribute is mandatory. It must include the name of your fetch-provider class. This is the value that must be later included in your DataSourceEntry objects in order to indicate which fetcher must be used. You can forget about it now, we will explain more when we get to the Using a custom fetch provider section.
    • The other attributes are specific to your fetcher. For example, in the postgres fetcher, the query attribute contains the SQL SELECT query that the fetcher should run against postgres to fetch the data.

Your FetchEvent derived class is more straightforward, simply:

  • Rename the event class to whatever you want.
  • Set the value of the fetcher key to be your custom provider class name.
  • Set the type of the config key to be your custom config class.
class PostgresFetchEvent(FetchEvent):
fetcher: str = "PostgresFetchProvider"
config: PostgresFetcherConfig = None

Step 4 - implementing your FetchProvider class

Your fetch provider class implements the actual logic that is needed to fetch a DataSourceEntry object.

The structure of the provider class is as follows:

class PostgresFetchProvider(BaseFetchProvider):
"""
The fetch-provider logic, must inherit from `BaseFetchProvider`.
"""
...

def __init__(self, event: PostgresFetchEvent) -> None:
"""
inits your provider class
"""

def parse_event(self, event: FetchEvent) -> PostgresFetchEvent:
"""
deserializes the fetch event type from the general `FetchEvent` to your derived fetch event (i.e: `PostgresFetchEvent`)
"""

# if you require context to cleanup or guard resources, you can use __aenter__() and __aexit__()
async def __aenter__(self): ...
async def __aexit__(self, exc_type=None, exc_val=None, tb=None): ...

async def _fetch_(self):
"""
the actual logic that you need to implement to fetch the `DataSourceEntry`.
Can reference your (derived) `FetcherConfig` object to access your fetcher attributes.
"""

async def _process_(self, data):
"""
optional processing of the data returned by _fetch_().
must return a jsonable python object (i.e: an object that can be dumped to json,
e.g: a list or a dict that contains only serializable objects).
"""

Let's reimplement the postgres provider step-by-step.

The constructor

The constructor must be initializd with the specific FetchEvent type you defined in the previous step, and should be propagated with super(). You may also initialize class members.

def __init__(self, event: PostgresFetchEvent) -> None:
...
super().__init__(event)
self._connection: Optional[asyncpg.Connection] = None
self._transaction: Optional[Transaction] = None

The super() method store the event in self._event, and your fetcher configuration can be accessed in self._event.config.

The parse_event() method

Simply replace the custom FetchEvent type (in this example PostgresFetchEvent) with your own custom type. This method simply deserializes the event object from the generic FetchEvent type into the more specific custom event type (i.e: PostgresFetchEvent).

def parse_event(self, event: FetchEvent) -> PostgresFetchEvent:
return PostgresFetchEvent(**event.dict(exclude={"config"}), config=event.config)

Manage a context with __aenter__ and __aexit__

Your fetch provider should typically access the network or disk and be I/O-bound, therefore it is best to use asyncio and typical best practices for writing async python code. That includes:

  • Preferring asyncio-ready libraries instead of blocking libraries to fetch the data.
    • For example in our postgres provider, we use asyncpg instead of the blocking psycopg2.
  • Using __aenter__ and __aexit__ if you need to cleanup resources or guard a context.
    • See more info on async context managers here.

__aenter__ should be typically be used to initialize a connection, create a transcation, etc.

In our postgres example you can see that we connect to the database and start a transaction inside __aenter__. Notice that the transaction itself is an async context manager so we await its own __aenter__:

async def __aenter__(self):
# initializing parameters from the event/config
self._event: PostgresFetchEvent
dsn: str = self._event.url
connection_params: dict = {} if self._event.config.connection_params is None else self._event.config.connection_params.dict(exclude_none=True)

# connect to the postgres database
self._connection: asyncpg.Connection = await asyncpg.connect(dsn, **connection_params)

# start a readonly transaction (we don't want OPAL client writing data due to security!)
self._transaction: Transaction = self._connection.transaction(readonly=True)
await self._transaction.__aenter__()

return self

Similarly __aexit__ should be typically be used to free resources that were allocated inside __aenter__.

In our postgres example:

async def __aexit__(self, exc_type=None, exc_val=None, tb=None):
# End the transaction
if self._transaction is not None:
await self._transaction.__aexit__(exc_type, exc_val, tb)
# Close the connection
if self._connection is not None:
await self._connection.close()

Implementing _fetch_ and _process_

Providers implement a _fetch_() method to access and fetch data from the data-source. They also optionally implement a _process_() method to mutate the data before returning it (for example converting a JSON string to an actual object).

The _fetch_() and _process_() method can access the fields available from self._event (the FetchEvent):

  • The url we should fetch data from is available at self._event.url.
  • The custom FetcherConfig (custom configuration) is available at self._event.config.

In our own example provider _fetch_() simply runs the SQL query and returns the results:

async def _fetch_(self):
self._event: PostgresFetchEvent # type casting

# ...
# there was more code here, it's not very interesting for the tutorial ;)
# ...

if self._event.config.fetch_one:
row = await self._connection.fetchrow(self._event.config.query)
return [row]
else:
return await self._connection.fetch(self._event.config.query)

Since asyncpg returns a list of asyncpg.Record objects, we must process them in _process_ and turn them into something jsonable (the reason is that we currently only support OPA as a policy store, and OPA can only store JSON).

Our _process_() method takes care of the conversion:

async def _process_(self, records: List[asyncpg.Record]):
self._event: PostgresFetchEvent # type casting

# when fetch_one is true, we want to return a dict (and not a list)
if self._event.config.fetch_one:
if records:
# we transform the asyncpg record to a dict that we can be later serialized to json
return dict(records[0])
else:
return {}
else:
# we transform the asyncpg records to a list-of-dicts that we can be later serialized to json
return [dict(record) for record in records]

Bonus: How the process of calling your fetch provider works:

  • The fetch provider is called by the FetchingEngine's fetch_worker.
  • The fetch_worker invokes a provider's .fetch() and .process() methods which are simply proxies to its _fetch_() and _process_() methods.
  • The fetcher-register loads the providers when OPAL client first loads and makes them available for fetch-workers.

Using a custom fetch provider

This section explains how to use a custom OPAL fetch provider in your OPAL setup.

Before we begin - How does OPAL find custom fetch providers?

As mentioned before, all FetchProviders are simply python classes that derive (inherit) from BaseFetchProvider. OPAL searches for fetch providers based on the env var OPAL_FETCH_PROVIDER_MODULES, defined here.

For example, if the env var is:

OPAL_FETCH_PROVIDER_MODULES=opal_common.fetcher.providers,opal_fetcher_postgres.provider

OPAL will parse this var as a comma-separated list, and for each item in the list OPAL will find that python module, import it and then look inside the imported module for subclasses of BaseFetchProvider.

In our example, OPAL will import two python modules:

  1. opal_common.fetcher.providers: there's a trick in the __init__.py file of the module that causes all classes in the directory to be added to __all__ and thus to be available directly under the module. Since both HttpFetchProvider and FastApiRpcFetchProvider inherit from BaseFetchProvider - both of them will be found by OPAL and added to the fetcher register.
  2. opal_fetcher_postgres.provider: no special tricks here. if you look inside that module, you will see that the class PostgresFetchProvider inherits from BaseFetchProvider.

1) Create a custom docker image containing your fetch provider

In the official docker images of OPAL, no custom providers are installed. In order for OPAL to be able to load a custom provider's python module, the python module need to be available on the docker image.

Therefore the first step is to create and build a custom OPAL-client Dockerfile.

Example Dockerfile (taken from the example fetcher repo) - of a non-published python package:

# inherits all behavior defined in the official OPAL-client image
FROM permitio/opal-client:latest
WORKDIR /app/
# These two commands installs the python package from source
COPY . ./
RUN python setup.py install

If your custom provider is published to PyPI (assuming its name is opal-fetcher-postgres), the docker image can be even simpler:

# inherits all behavior defined in the official OPAL-client image
FROM permitio/opal-client:latest
# installs the python package inside the container (from pip / PyPI)
RUN pip install --user opal-fetcher-postgres

2) Build your custom opal-client container

Say your special Dockerfile from step one is called custom_client.Dockerfile.

You must build a customized OPAL container from this Dockerfile, like so:

docker build -t yourcompany/opal-client -f custom_client.Dockerfile .

3) When running OPAL, set OPAL_FETCH_PROVIDER_MODULES

Pass a customized OPAL_FETCH_PROVIDER_MODULES env var to the OPAL client docker container (comma-separated provider modules):

OPAL_FETCH_PROVIDER_MODULES=opal_common.fetcher.providers,opal_fetcher_postgres.provider

Notice that OPAL receives a list from where to search for fetch providers. The list in our case includes the built-in providers (opal_common.fetcher.providers) as well as our custom postgres provider. Naturally, replace opal_fetcher_postgres.provider with your own custom provider if needed.

4) Using the custom provider in your DataSourceEntry objects

Fetchers are triggered when OPAL client is instructed to fetch Data Source Entries. Each entry is a directive what data to fetch, from where, how it should be fetched and how it should be saved into the policy store (i.e: OPA).

Your DataSourceEntry objects can be used either in OPAL_DATA_CONFIG_SOURCES as initial data sources fetched when OPAL client first loads, or in dynamic (realtime) updates sent via the OPAL publish API. There's a guide on hwo to trigger data updates here.

Each DataSourceEntry object has a config attribute which is a dict that matches the schema of FetcherConfig.

The dict included in config should contain:

  • A fetcher attribute that indicates to OPAL that a custom provider should be used to fetch that DataSourceEntry. The fetcher attribute should contain the name of the custom FetchProvider class (the class that derives from BaseFetchProvider).
  • Any custom attributes that your fetcher config type declares (the python class defined in your fetch-provider module that inherits from FetcherConfig). See how to write a custom config in the writing providers section above.

Example value of OPAL_DATA_CONFIG_SOURCES (formatted nicely, but in env var you should pack this to one-line and no-spaces):

{
"config": {
"entries": [
{
"url": "postgresql://postgres@example_db:5432/postgres",
"config": {
"fetcher": "PostgresFetchProvider",
"query": "SELECT * from city;",
"connection_params": {
"password": "postgres"
}
},
"topics": ["policy_data"],
"dst_path": "cities"
}
]
}
}

In the example OPAL_DATA_CONFIG_SOURCES we just shown:

  • The fetcher attributes indicates that in order to fetch the entry, the provider PostgresFetchProvider must be used.
  • The query and connection_params attributes are specific to the PostgresFetchProvider provider, and are defined by the config type PostgresFetcherConfig.

Wrapping this up - check out a docker compose example

This docker compose file contains:

  • A custom opal client based on this Dockerfile. Only difference is that we install the custom fetch-provider python module into the container.
  • The configuration necessary to use the custom fetch provider:
    • OPAL_FETCH_PROVIDER_MODULES is defined for the OPAL-client and tells OPAL to load opal_fetcher_postgres.provider to the fetcher register.
    • OPAL_DATA_CONFIG_SOURCES is defined for the OPAL-server with a DataSourceEntry that contains a fetcher override. The value of the fetcher key tell OPAL to use PostgresFetchProvider to fetch the entry.

you may run this compose file by cloning the example repo and running

docker compose up

Reference - important classes and modules