How to Add a New Datasource Implementation
This guide demonstrates how to add support for a new datasource implementation, using Confluence as an example.
Architecture
Datasources are managed by the DatasourceManager
, which aggregates required components and orchestrates them to retrieve documents, clean them, and parse them to markdown format - which is strictly required by the embedding process. The general datasource manager flow is:
Reader -> Parser (Optional) -> Cleaner (Optional) -> Splitter (Optional).
Therefore, adding support for a new datasource requires implementing these components and their respective manager.
Implementation
Step 1: Dependencies
Add the required packages to pyproject.toml
under the following section:
[project.optional-dependencies]
extraction = [
"atlassian-python-api>=3.41.19",
...
]
Step 2: Datasource Enum
In datasources.py, add the new datasource to the DatasourceName
enumeration:
class DatasourceName(str, Enum):
...
CONFLUENCE = "confluence"
Step 3: Datasource Secrets
Create a new directory src/extraction/datasources/confluence
and create a configuration.py
file in it. This configuration file will contain necessary fields and secrets for setup.
from typing import Literal, Union
from pydantic import ConfigDict, Field, SecretStr
from core.base_configuration import BaseSecrets
from extraction.bootstrap.configuration.datasources import (
DatasourceConfiguration,
DatasourceName,
)
class ConfluenceDatasourceConfiguration(DatasourceConfiguration):
class Secrets(BaseSecrets):
model_config = ConfigDict(
env_file_encoding="utf-8",
env_prefix="RAG__DATASOURCES__CONFLUENCE__",
env_nested_delimiter="__",
extra="ignore",
)
username: SecretStr = Field(
...,
description="Username credential used to authenticate with the Confluence instance",
)
password: SecretStr = Field(
...,
description="Password credential used to authenticate with the Confluence instance",
)
secrets: Secrets = Field(
None,
description="Authentication credentials required to access the Confluence instance",
)
The first part is to create a configuration that extends DatasourceConfiguration
. The Secrets
inner class defines secret fields that will be present in the environment secret file under the RAG__DATASOURCES__CONFLUENCE__
prefix. Add the corresponding environment variables to configurations/secrets.{environment}.env
:
RAG__DATASOURCES__CONFLUENCE__USERNAME=<confluence_username>
RAG__DATASOURCES__CONFLUENCE__PASSWORD=<confluence_password>
Note: If your datasource doesn't require secrets, you can skip this step.
Step 4: Datasource Configuration
Finish up ConfluenceDatasourceConfiguration
implementation and add the rest of the configuration required for the datasource:
...
class ConfluenceDatasourceConfiguration(DatasourceConfiguration):
...
host: str = Field(
"127.0.0.1",
description="Hostname or IP address of the Confluence server instance",
)
protocol: Union[Literal["http"], Literal["https"]] = Field(
"http",
description="Communication protocol used to connect to the Confluence server",
)
name: Literal[DatasourceName.CONFLUENCE] = Field(
...,
description="Identifier specifying this configuration is for a Confluence datasource",
)
@property
def base_url(self) -> str:
return f"{self.protocol}://{self.host}"
provider
field constraints the value to DatasourceName.CONFLUENCE
, which serves as an indicator for pydantic validator.
Step 5: Confluence Document
The next step is to create a Confluence document data class in document.py
:
from extraction.datasources.core.document import BaseDocument
class ConfluenceDocument(BaseDocument):
"""Document representation for Confluence page content.
Extends BaseDocument to handle Confluence-specific document processing including
content extraction, metadata handling, and exclusion configuration.
"""
pass
In our case, we don't need anything beyond the BaseDocument
implementation.
Step 6: Confluence Client
To create a Confluence client, we implement ConfluenceClientFactory
in client.py
. It extends SingletonFactory
,
which provides an interface for initializing a single instance for the duration of the application runtime.
from typing import Type
from atlassian import Confluence
from core import SingletonFactory
from extraction.datasources.confluence.configuration import (
ConfluenceDatasourceConfiguration,
)
class ConfluenceClientFactory(SingletonFactory):
_configuration_class: Type = ConfluenceDatasourceConfiguration
@classmethod
def _create_instance(
cls, configuration: ConfluenceDatasourceConfiguration
) -> Confluence:
return Confluence(
url=configuration.base_url,
username=configuration.secrets.username.get_secret_value(),
password=configuration.secrets.password.get_secret_value(),
)
The field _configuration_class
defines the required configuration type. The rest involves implementing
the required _create_instance
method with the corresponding client initialization.
Step 7: Datasource Reader
Create a Confluence reader in reader.py
that implements the BaseReader interface:
from extraction.datasources.core.reader import BaseReader
...
class ConfluenceDatasourceReader(BaseReader):
async def read_all_async(
self,
) -> AsyncIterator[dict]:
# read Confluence pages implementation
This method returns an iterator, which improves runtime memory management. Next, implement a factory that defines how the ConfluenceDatasourceReader
is initialized:
from core import Factory
...
class ConfluenceDatasourceReaderFactory(Factory):
_configuration_class = ConfluenceDatasourceConfiguration
@classmethod
def _create_instance(
cls, configuration: ConfluenceDatasourceConfiguration
) -> ConfluenceDatasourceReader:
client = ConfluenceClientFactory.create(configuration)
return ConfluenceDatasourceReader(
configuration=configuration,
client=client,
)
Note that instead of initializing the Confluence client directly, the factory uses ConfluenceClientFactory
to handle this task.
Step 8: Datasource Parser
In parser.py
implement a parser responsible for converting the raw Confluence page to markdown format:
from extraction.datasources.confluence.configuration import (
ConfluenceDatasourceConfiguration,
)
from extraction.datasources.confluence.document import ConfluenceDocument
from extraction.datasources.core.parser import BaseParser
class ConfluenceDatasourceParser(BaseParser[ConfluenceDocument]):
def parse(self, page: str) -> ConfluenceDocument:
# parse Confluence page implementation
As before, define a factory for the parser:
class ConfluenceDatasourceParserFactory(Factory):
_configuration_class: Type = ConfluenceDatasourceConfiguration
@classmethod
def _create_instance(
cls, configuration: ConfluenceDatasourceConfiguration
) -> ConfluenceDatasourceParser:
return ConfluenceDatasourceParser(configuration)
Step 9: Datasource Manager
To orchestrate all the previous components, we will reuse BasicDatasourceManager
and implement a factory for it in manager.py
:
class ConfluenceDatasourceManagerFactory(Factory):
"""Factory for creating Confluence datasource managers.
This factory generates managers that handle the extraction of content from
Confluence instances. It ensures proper configuration, reading, and parsing
of Confluence content.
Attributes:
_configuration_class: Configuration class used for validating and processing
Confluence-specific settings.
"""
_configuration_class: Type = ConfluenceDatasourceConfiguration
@classmethod
def _create_instance(
cls, configuration: ConfluenceDatasourceConfiguration
) -> BasicDatasourceManager:
"""Create a configured Confluence datasource manager.
Sets up the necessary reader and parser components based on the provided
configuration and assembles them into a functional manager.
Args:
configuration: Configuration object containing Confluence-specific
parameters including authentication details, spaces to extract,
and other extraction options.
Returns:
A fully initialized datasource manager that can extract and process
data from Confluence.
"""
reader = ConfluenceDatasourceReaderFactory.create(configuration)
parser = ConfluenceDatasourceParserFactory.create(configuration)
return BasicDatasourceManager(configuration, reader, parser)
Following the design pattern, ConfluenceDatasourceManagerFactory
uses reader and parser factories to obtain the instances needed for the manager.
Step 10: Datasource Integration
Create an __init__.py
file as follows:
from extraction.bootstrap.configuration.datasources import (
DatasourceConfigurationRegistry,
DatasourceName,
)
from extraction.datasources.confluence.configuration import (
ConfluenceDatasourceConfiguration,
)
from extraction.datasources.confluence.manager import (
ConfluenceDatasourceManagerFactory,
)
from extraction.datasources.registry import DatasourceManagerRegistry
def register() -> None:
DatasourceManagerRegistry.register(
DatasourceName.CONFLUENCE, ConfluenceDatasourceManagerFactory
)
DatasourceConfigurationRegistry.register(
DatasourceName.CONFLUENCE, ConfluenceDatasourceConfiguration
)
The initialization file includes a register()
method responsible for registering our configuration and manager factories. Registries are used to dynamically inform the system about available implementations. This way, with the following Confluence configuration in configurations/configuration.{environment}.json
file:
"extraction": {
"datasources": [
{
"name": "confluence",
"host": "wissen.feld-m.de",
"protocol": "https"
}
]
...
}
...
We can dynamically retrieve the corresponding manager implementation by using the name specified in the configuration:
datasource_config = read_datasource_from_config()
datasource_manager = DatasourceManagerRegistry.get(datasource_config.name).create(datasource_config)
This mechanism is later used by DatasourceOrchestrator
to initialize datasources defined in the configuration. These steps conclude the implementation, resulting in the following file structure:
src/
└── extraction/
└── datasources/
└── confluence/
├── __init__.py
├── client.py
├── configuration.py
├── document.py
├── manager.py
├── parser.py
└── reader.py
Notes
Below is the __init__
method of BasicDatasourceManager
used in our tutorial:
class BasicDatasourceManager(BaseDatasourceManager, Generic[DocType]):
def __init__(
self,
configuration: ExtractionConfiguration,
reader: BaseReader,
parser: BaseParser = BasicMarkdownParser(),
cleaner: BaseCleaner = BasicMarkdownCleaner(),
splitter: BaseSplitter = BasicMarkdownSplitter(),
):
Note that in this guide we skipped the implementation of custom cleaner
and splitter
components, instead using the default ones. When building a new datasource integration, you might need to implement custom versions of these components based on your specific requirements.