Core Datasource

This module contains functionality related to the Core datasource.

Cleaner

BaseCleaner

Bases: ABC, Generic[DocType]

Abstract base class for document cleaning operations.

Defines the interface for document cleaners with generic type support to ensure type safety across different document implementations.

Source code in src/extraction/datasources/core/cleaner.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class BaseCleaner(ABC, Generic[DocType]):
    """Abstract base class for document cleaning operations.

    Defines the interface for document cleaners with generic type support
    to ensure type safety across different document implementations.
    """

    @abstractmethod
    def clean(self, document: DocType) -> DocType:
        """Clean a single document.

        Args:
            document: The document to be cleaned

        Returns:
            The cleaned document or None if document should be filtered out
        """
        pass

clean(document) abstractmethod

Clean a single document.

Parameters:
  • document (DocType) –

    The document to be cleaned

Returns:
  • DocType

    The cleaned document or None if document should be filtered out

Source code in src/extraction/datasources/core/cleaner.py
14
15
16
17
18
19
20
21
22
23
24
@abstractmethod
def clean(self, document: DocType) -> DocType:
    """Clean a single document.

    Args:
        document: The document to be cleaned

    Returns:
        The cleaned document or None if document should be filtered out
    """
    pass

BasicMarkdownCleaner

Bases: BaseCleaner, Generic[DocType]

Document cleaner for basic content validation.

Checks for empty content in documents and filters them out. Works with any document type that has a text attribute.

Source code in src/extraction/datasources/core/cleaner.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class BasicMarkdownCleaner(BaseCleaner, Generic[DocType]):
    """Document cleaner for basic content validation.

    Checks for empty content in documents and filters them out.
    Works with any document type that has a text attribute.
    """

    def clean(self, document: DocType) -> DocType:
        """Remove document if it contains empty content.

        Args:
            document: The document to validate

        Returns:
            The original document if content is not empty, None otherwise
        """
        if not self._has_empty_content(document):
            return document

        return None

    @staticmethod
    def _has_empty_content(document: DocType) -> bool:
        """Check if document content is empty.

        Args:
            document: Document to check (must have a text attribute)

        Returns:
            True if document's text is empty or contains only whitespace
        """
        return not document.text.strip()

clean(document)

Remove document if it contains empty content.

Parameters:
  • document (DocType) –

    The document to validate

Returns:
  • DocType

    The original document if content is not empty, None otherwise

Source code in src/extraction/datasources/core/cleaner.py
34
35
36
37
38
39
40
41
42
43
44
45
46
def clean(self, document: DocType) -> DocType:
    """Remove document if it contains empty content.

    Args:
        document: The document to validate

    Returns:
        The original document if content is not empty, None otherwise
    """
    if not self._has_empty_content(document):
        return document

    return None

Document

BaseDocument

Bases: Document

Base document class for structured content storage.

Extends LlamaIndex Document to add support for attachments and metadata filtering for embedding and LLM contexts.

Attributes:
  • attachments (Optional[Dict[str, str]]) –

    Dictionary mapping placeholder keys to attachment content

  • included_embed_metadata_keys (List[str]) –

    Metadata fields to include in embeddings

  • included_llm_metadata_keys (List[str]) –

    Metadata fields to include in LLM context

Note

DocType TypeVar ensures type safety when implementing document types. Default metadata includes title and timestamp information.

Source code in src/extraction/datasources/core/document.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class BaseDocument(Document):
    """Base document class for structured content storage.

    Extends LlamaIndex Document to add support for attachments and
    metadata filtering for embedding and LLM contexts.

    Attributes:
        attachments: Dictionary mapping placeholder keys to attachment content
        included_embed_metadata_keys: Metadata fields to include in embeddings
        included_llm_metadata_keys: Metadata fields to include in LLM context

    Note:
        DocType TypeVar ensures type safety when implementing document types.
        Default metadata includes title and timestamp information.
    """

    attachments: Optional[Dict[str, str]] = Field(
        description="Document attachments with placeholders as keys and content as values",
        default=None,
    )

    included_embed_metadata_keys: List[str] = [
        "title",
        "created_time",
        "last_edited_time",
    ]

    included_llm_metadata_keys: List[str] = [
        "title",
        "created_time",
        "last_edited_time",
    ]

    def __init__(self, text: str, metadata: dict, attachments: dict = None):
        """Initialize a document with text, metadata, and optional attachments.

        Sets up excluded metadata keys for embedding and LLM contexts.
        """
        super().__init__(text=text, metadata=metadata)
        self.attachments = attachments or {}
        self.excluded_embed_metadata_keys = self._set_excluded_metadata_keys(
            self.metadata, self.included_embed_metadata_keys
        )
        self.excluded_llm_metadata_keys = self._set_excluded_metadata_keys(
            self.metadata, self.included_llm_metadata_keys
        )

    @staticmethod
    def _set_excluded_metadata_keys(
        metadata: dict, included_keys: List[str]
    ) -> List[str]:
        """Identify metadata keys to exclude from processing.

        Returns all keys from metadata that aren't in the included_keys list.
        """
        return [key for key in metadata.keys() if key not in included_keys]

__init__(text, metadata, attachments=None)

Initialize a document with text, metadata, and optional attachments.

Sets up excluded metadata keys for embedding and LLM contexts.

Source code in src/extraction/datasources/core/document.py
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, text: str, metadata: dict, attachments: dict = None):
    """Initialize a document with text, metadata, and optional attachments.

    Sets up excluded metadata keys for embedding and LLM contexts.
    """
    super().__init__(text=text, metadata=metadata)
    self.attachments = attachments or {}
    self.excluded_embed_metadata_keys = self._set_excluded_metadata_keys(
        self.metadata, self.included_embed_metadata_keys
    )
    self.excluded_llm_metadata_keys = self._set_excluded_metadata_keys(
        self.metadata, self.included_llm_metadata_keys
    )

Manager

BaseDatasourceManager

Bases: ABC, Generic[DocType]

Abstract base class for datasource management.

Defines the interface for managing the extraction, parsing, cleaning, and splitting of documents from a data source. This class serves as a template for implementing specific datasource managers, ensuring a consistent interface and behavior across different implementations.

Source code in src/extraction/datasources/core/manager.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class BaseDatasourceManager(ABC, Generic[DocType]):
    """Abstract base class for datasource management.

    Defines the interface for managing the extraction, parsing,
    cleaning, and splitting of documents from a data source.
    This class serves as a template for implementing specific
    datasource managers, ensuring a consistent interface and
    behavior across different implementations.
    """

    def __init__(
        self,
        configuration: ExtractionConfiguration,
        reader: BaseReader,
        parser: BaseParser = BasicMarkdownParser(),
        cleaner: BaseCleaner = BasicMarkdownCleaner(),
        splitter: BaseSplitter = BasicMarkdownSplitter(),
    ):
        """Initialize datasource manager.

        Args:
            configuration: Embedding and processing settings
            reader: Content extraction component
            cleaner: Content cleaning component
            splitter: Content splitting component
        """
        self.configuration = configuration
        self.reader = reader
        self.parser = parser
        self.cleaner = cleaner
        self.splitter = splitter

    @abstractmethod
    async def full_refresh_sync(
        self,
    ) -> AsyncIterator[DocType]:
        """Extract and process all content from the datasource.

        Returns:
            An async iterator yielding processed document chunks of type DocType
        """
        pass

    @abstractmethod
    def incremental_sync(self):
        """Process only new or changed content from the datasource.

        This method should handle differential updates to avoid
        reprocessing all content when only portions have changed.
        Implementations should update the vector storage accordingly.
        """
        pass

__init__(configuration, reader, parser=BasicMarkdownParser(), cleaner=BasicMarkdownCleaner(), splitter=BasicMarkdownSplitter())

Initialize datasource manager.

Parameters:
  • configuration (ExtractionConfiguration) –

    Embedding and processing settings

  • reader (BaseReader) –

    Content extraction component

  • cleaner (BaseCleaner, default: BasicMarkdownCleaner() ) –

    Content cleaning component

  • splitter (BaseSplitter, default: BasicMarkdownSplitter() ) –

    Content splitting component

Source code in src/extraction/datasources/core/manager.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(
    self,
    configuration: ExtractionConfiguration,
    reader: BaseReader,
    parser: BaseParser = BasicMarkdownParser(),
    cleaner: BaseCleaner = BasicMarkdownCleaner(),
    splitter: BaseSplitter = BasicMarkdownSplitter(),
):
    """Initialize datasource manager.

    Args:
        configuration: Embedding and processing settings
        reader: Content extraction component
        cleaner: Content cleaning component
        splitter: Content splitting component
    """
    self.configuration = configuration
    self.reader = reader
    self.parser = parser
    self.cleaner = cleaner
    self.splitter = splitter

full_refresh_sync() abstractmethod async

Extract and process all content from the datasource.

Returns:
  • AsyncIterator[DocType]

    An async iterator yielding processed document chunks of type DocType

Source code in src/extraction/datasources/core/manager.py
52
53
54
55
56
57
58
59
60
61
@abstractmethod
async def full_refresh_sync(
    self,
) -> AsyncIterator[DocType]:
    """Extract and process all content from the datasource.

    Returns:
        An async iterator yielding processed document chunks of type DocType
    """
    pass

incremental_sync() abstractmethod

Process only new or changed content from the datasource.

This method should handle differential updates to avoid reprocessing all content when only portions have changed. Implementations should update the vector storage accordingly.

Source code in src/extraction/datasources/core/manager.py
63
64
65
66
67
68
69
70
71
@abstractmethod
def incremental_sync(self):
    """Process only new or changed content from the datasource.

    This method should handle differential updates to avoid
    reprocessing all content when only portions have changed.
    Implementations should update the vector storage accordingly.
    """
    pass

BasicDatasourceManager

Bases: BaseDatasourceManager, Generic[DocType]

Standard implementation of datasource content processing pipeline.

Handles the extraction, parsing, cleaning, and splitting of documents from a data source. Processes documents using the provided components in a sequential pipeline to prepare them for embedding and storage.

Source code in src/extraction/datasources/core/manager.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class BasicDatasourceManager(BaseDatasourceManager, Generic[DocType]):
    """Standard implementation of datasource content processing pipeline.

    Handles the extraction, parsing, cleaning, and splitting of documents
    from a data source. Processes documents using the provided components
    in a sequential pipeline to prepare them for embedding and storage.
    """

    async def full_refresh_sync(
        self,
    ) -> AsyncIterator[DocType]:
        """Process all content from the datasource from scratch.

        Executes the complete pipeline:
        1. Reads source objects asynchronously
        2. Parses each object into a document
        3. Cleans the content
        4. Splits into appropriate chunks

        Returns:
            An async iterator yielding processed document chunks of type DocType
        """
        objects = self.reader.read_all_async()
        async for object in objects:
            md_document = self.parser.parse(object)
            cleaned_document = self.cleaner.clean(md_document)
            if cleaned_document:
                for split_document in self.splitter.split(cleaned_document):
                    yield split_document

    def incremental_sync(self):
        """Process only new or changed content since the last sync.

        Should be implemented by subclasses to provide efficient
        updates when only a portion of the datasource has changed.

        Raises:
            NotImplementedError: This feature is not yet implemented
        """
        raise NotImplementedError("Currently unsupported feature.")

full_refresh_sync() async

Process all content from the datasource from scratch.

Executes the complete pipeline: 1. Reads source objects asynchronously 2. Parses each object into a document 3. Cleans the content 4. Splits into appropriate chunks

Returns:
  • AsyncIterator[DocType]

    An async iterator yielding processed document chunks of type DocType

Source code in src/extraction/datasources/core/manager.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def full_refresh_sync(
    self,
) -> AsyncIterator[DocType]:
    """Process all content from the datasource from scratch.

    Executes the complete pipeline:
    1. Reads source objects asynchronously
    2. Parses each object into a document
    3. Cleans the content
    4. Splits into appropriate chunks

    Returns:
        An async iterator yielding processed document chunks of type DocType
    """
    objects = self.reader.read_all_async()
    async for object in objects:
        md_document = self.parser.parse(object)
        cleaned_document = self.cleaner.clean(md_document)
        if cleaned_document:
            for split_document in self.splitter.split(cleaned_document):
                yield split_document

incremental_sync()

Process only new or changed content since the last sync.

Should be implemented by subclasses to provide efficient updates when only a portion of the datasource has changed.

Raises:
  • NotImplementedError

    This feature is not yet implemented

Source code in src/extraction/datasources/core/manager.py
104
105
106
107
108
109
110
111
112
113
def incremental_sync(self):
    """Process only new or changed content since the last sync.

    Should be implemented by subclasses to provide efficient
    updates when only a portion of the datasource has changed.

    Raises:
        NotImplementedError: This feature is not yet implemented
    """
    raise NotImplementedError("Currently unsupported feature.")

Parser

BaseParser

Bases: ABC, Generic[DocType]

Abstract base class for document parsers.

Defines the interface for parsing content into documents of specified type (DocType).

Source code in src/extraction/datasources/core/parser.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class BaseParser(ABC, Generic[DocType]):
    """
    Abstract base class for document parsers.

    Defines the interface for parsing content into documents
    of specified type (DocType).
    """

    @abstractmethod
    def parse(self, content: str) -> DocType:
        """
        Parse content into a document of type DocType.

        Args:
            content: Raw content string to be parsed

        Returns:
            Parsed document of type DocType
        """
        pass

parse(content) abstractmethod

Parse content into a document of type DocType.

Parameters:
  • content (str) –

    Raw content string to be parsed

Returns:
  • DocType

    Parsed document of type DocType

Source code in src/extraction/datasources/core/parser.py
17
18
19
20
21
22
23
24
25
26
27
28
@abstractmethod
def parse(self, content: str) -> DocType:
    """
    Parse content into a document of type DocType.

    Args:
        content: Raw content string to be parsed

    Returns:
        Parsed document of type DocType
    """
    pass

BasicMarkdownParser

Bases: BaseParser[Document]

Markdown parser that converts markdown text into Document objects.

Implements the BaseParser interface for basic markdown content.

Source code in src/extraction/datasources/core/parser.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class BasicMarkdownParser(BaseParser[Document]):
    """
    Markdown parser that converts markdown text into Document objects.

    Implements the BaseParser interface for basic markdown content.
    """

    def parse(self, markdown: str) -> Document:
        """
        Parse markdown content into a Document object.

        Args:
            markdown: Markdown content to be parsed

        Returns:
            Document object containing the markdown text
        """
        return Document(text=markdown, metadata={})

parse(markdown)

Parse markdown content into a Document object.

Parameters:
  • markdown (str) –

    Markdown content to be parsed

Returns:
  • Document

    Document object containing the markdown text

Source code in src/extraction/datasources/core/parser.py
38
39
40
41
42
43
44
45
46
47
48
def parse(self, markdown: str) -> Document:
    """
    Parse markdown content into a Document object.

    Args:
        markdown: Markdown content to be parsed

    Returns:
        Document object containing the markdown text
    """
    return Document(text=markdown, metadata={})

Reader

BaseReader

Bases: ABC

Abstract base class for document source readers.

This class defines a standard interface for extracting documents from various data sources. Concrete implementations should inherit from this class and implement the required methods to handle specific data source types.

The generic typing allows for flexibility in the document types returned by different implementations.

Source code in src/extraction/datasources/core/reader.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class BaseReader(ABC):
    """Abstract base class for document source readers.

    This class defines a standard interface for extracting documents from various
    data sources. Concrete implementations should inherit from this class and
    implement the required methods to handle specific data source types.

    The generic typing allows for flexibility in the document types returned
    by different implementations.
    """

    @abstractmethod
    async def read_all_async(self) -> AsyncIterator[Any]:
        """Asynchronously retrieve documents from the source.

        Implementations should use async iteration to efficiently stream documents
        from the source without loading all content into memory at once.

        Returns:
            AsyncIterator[Any]: An async iterator that yields documents as they're
                               extracted from the source.

        Raises:
            NotImplementedError: This abstract method must be implemented by subclasses.
        """
        pass

read_all_async() abstractmethod async

Asynchronously retrieve documents from the source.

Implementations should use async iteration to efficiently stream documents from the source without loading all content into memory at once.

Returns:
  • AsyncIterator[Any]

    AsyncIterator[Any]: An async iterator that yields documents as they're extracted from the source.

Raises:
  • NotImplementedError

    This abstract method must be implemented by subclasses.

Source code in src/extraction/datasources/core/reader.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@abstractmethod
async def read_all_async(self) -> AsyncIterator[Any]:
    """Asynchronously retrieve documents from the source.

    Implementations should use async iteration to efficiently stream documents
    from the source without loading all content into memory at once.

    Returns:
        AsyncIterator[Any]: An async iterator that yields documents as they're
                           extracted from the source.

    Raises:
        NotImplementedError: This abstract method must be implemented by subclasses.
    """
    pass

Splitter

BaseSplitter

Bases: ABC, Generic[DocType]

Abstract base class for document splitters.

This class defines the interface for splitting documents into smaller chunks. All splitter implementations should inherit from this class.

Source code in src/extraction/datasources/core/splitter.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class BaseSplitter(ABC, Generic[DocType]):
    """Abstract base class for document splitters.

    This class defines the interface for splitting documents into smaller chunks.
    All splitter implementations should inherit from this class.
    """

    @abstractmethod
    def split(self, document: DocType) -> List[DocType]:
        """Split a document into multiple smaller documents.

        Args:
            document: The document to be split.

        Returns:
            A list of document chunks.
        """
        pass

split(document) abstractmethod

Split a document into multiple smaller documents.

Parameters:
  • document (DocType) –

    The document to be split.

Returns:
  • List[DocType]

    A list of document chunks.

Source code in src/extraction/datasources/core/splitter.py
14
15
16
17
18
19
20
21
22
23
24
@abstractmethod
def split(self, document: DocType) -> List[DocType]:
    """Split a document into multiple smaller documents.

    Args:
        document: The document to be split.

    Returns:
        A list of document chunks.
    """
    pass

BasicMarkdownSplitter

Bases: BaseSplitter, Generic[DocType]

A simple splitter implementation that returns the document as-is.

This splitter does not perform any actual splitting and is primarily used as a pass-through when splitting is not required.

Source code in src/extraction/datasources/core/splitter.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class BasicMarkdownSplitter(BaseSplitter, Generic[DocType]):
    """A simple splitter implementation that returns the document as-is.

    This splitter does not perform any actual splitting and is primarily
    used as a pass-through when splitting is not required.
    """

    def split(self, document: DocType) -> List[DocType]:
        """Return the document as a single-item list without splitting.

        Args:
            document: The document to be processed.

        Returns:
            A list containing the original document as the only element.
        """
        return [document]

split(document)

Return the document as a single-item list without splitting.

Parameters:
  • document (DocType) –

    The document to be processed.

Returns:
  • List[DocType]

    A list containing the original document as the only element.

Source code in src/extraction/datasources/core/splitter.py
34
35
36
37
38
39
40
41
42
43
def split(self, document: DocType) -> List[DocType]:
    """Return the document as a single-item list without splitting.

    Args:
        document: The document to be processed.

    Returns:
        A list containing the original document as the only element.
    """
    return [document]