Core Datasource

This module contains functionality related to the Core datasource.

Cleaner

BaseCleaner

Bases: ABC, Generic[DocType]

Abstract base class defining document cleaning interface.

Provides interface for cleaning document collections with type safety through generic typing.

Source code in src/embedding/datasources/core/cleaner.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class BaseCleaner(ABC, Generic[DocType]):
    """Abstract base class defining document cleaning interface.

    Provides interface for cleaning document collections with type safety
    through generic typing.

    Attributes:
        None
    """

    @abstractmethod
    def clean(self, documents: List[DocType]) -> List[DocType]:
        """Clean a list of documents.

        Args:
            documents: List of documents to clean

        Returns:
            List[DocType]: List of cleaned documents
        """
        pass

    @staticmethod
    def _has_empty_content(document: TextNode) -> bool:
        """Check if document content is empty.

        Args:
            document: Document to check for content

        Returns:
            bool: True if document text is empty after stripping whitespace
        """
        return not document.text.strip()

_has_empty_content(document) staticmethod

Check if document content is empty.

Parameters:
  • document (TextNode) –

    Document to check for content

Returns:
  • bool( bool ) –

    True if document text is empty after stripping whitespace

Source code in src/embedding/datasources/core/cleaner.py
32
33
34
35
36
37
38
39
40
41
42
@staticmethod
def _has_empty_content(document: TextNode) -> bool:
    """Check if document content is empty.

    Args:
        document: Document to check for content

    Returns:
        bool: True if document text is empty after stripping whitespace
    """
    return not document.text.strip()

clean(documents) abstractmethod

Clean a list of documents.

Parameters:
  • documents (List[DocType]) –

    List of documents to clean

Returns:
  • List[DocType]

    List[DocType]: List of cleaned documents

Source code in src/embedding/datasources/core/cleaner.py
20
21
22
23
24
25
26
27
28
29
30
@abstractmethod
def clean(self, documents: List[DocType]) -> List[DocType]:
    """Clean a list of documents.

    Args:
        documents: List of documents to clean

    Returns:
        List[DocType]: List of cleaned documents
    """
    pass

Cleaner

Bases: BaseCleaner

Generic document cleaner implementation.

Removes empty documents from collections while tracking progress. Supports any document type with a text attribute.

Source code in src/embedding/datasources/core/cleaner.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class Cleaner(BaseCleaner):
    """Generic document cleaner implementation.

    Removes empty documents from collections while tracking progress.
    Supports any document type with a text attribute.
    """

    def clean(self, documents: List[Any]) -> List[Any]:
        """Remove empty documents from collection.

        Args:
            documents: List of documents to clean

        Returns:
            List[Any]: Filtered list containing only non-empty documents

        Note:
            Document type is inferred from first document in collection.
        """
        if not documents:
            return []

        # Infer document type from the first document
        document_type_name = type(documents[0]).__name__

        cleaned_documents = []
        for document in self._get_documents_with_tqdm(
            documents, document_type_name
        ):
            if not self._has_empty_content(document):
                cleaned_documents.append(document)

        return cleaned_documents

    def _get_documents_with_tqdm(
        self, documents: List[Any], document_type_name: str
    ):
        """Wrap document iteration with optional progress bar.

        Args:
            documents: List of documents to process
            document_type_name: Name of document type for progress display

        Returns:
            Iterator over documents, optionally wrapped with progress bar
        """
        return tqdm(
            documents, desc=f"[{document_type_name}] Cleaning documents"
        )

    @staticmethod
    def _has_empty_content(document: Any) -> bool:
        """Check if document has empty content.

        Args:
            document: Document to check (must have text attribute)

        Returns:
            bool: True if document text is empty after stripping whitespace
        """
        return not document.text.strip()

_get_documents_with_tqdm(documents, document_type_name)

Wrap document iteration with optional progress bar.

Parameters:
  • documents (List[Any]) –

    List of documents to process

  • document_type_name (str) –

    Name of document type for progress display

Returns:
  • Iterator over documents, optionally wrapped with progress bar

Source code in src/embedding/datasources/core/cleaner.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def _get_documents_with_tqdm(
    self, documents: List[Any], document_type_name: str
):
    """Wrap document iteration with optional progress bar.

    Args:
        documents: List of documents to process
        document_type_name: Name of document type for progress display

    Returns:
        Iterator over documents, optionally wrapped with progress bar
    """
    return tqdm(
        documents, desc=f"[{document_type_name}] Cleaning documents"
    )

_has_empty_content(document) staticmethod

Check if document has empty content.

Parameters:
  • document (Any) –

    Document to check (must have text attribute)

Returns:
  • bool( bool ) –

    True if document text is empty after stripping whitespace

Source code in src/embedding/datasources/core/cleaner.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
@staticmethod
def _has_empty_content(document: Any) -> bool:
    """Check if document has empty content.

    Args:
        document: Document to check (must have text attribute)

    Returns:
        bool: True if document text is empty after stripping whitespace
    """
    return not document.text.strip()

clean(documents)

Remove empty documents from collection.

Parameters:
  • documents (List[Any]) –

    List of documents to clean

Returns:
  • List[Any]

    List[Any]: Filtered list containing only non-empty documents

Note

Document type is inferred from first document in collection.

Source code in src/embedding/datasources/core/cleaner.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def clean(self, documents: List[Any]) -> List[Any]:
    """Remove empty documents from collection.

    Args:
        documents: List of documents to clean

    Returns:
        List[Any]: Filtered list containing only non-empty documents

    Note:
        Document type is inferred from first document in collection.
    """
    if not documents:
        return []

    # Infer document type from the first document
    document_type_name = type(documents[0]).__name__

    cleaned_documents = []
    for document in self._get_documents_with_tqdm(
        documents, document_type_name
    ):
        if not self._has_empty_content(document):
            cleaned_documents.append(document)

    return cleaned_documents

Document

BaseDocument

Bases: Document

Base document class for structured content storage.

Extends LlamaIndex Document to add support for attachments and metadata filtering for embedding and LLM contexts.

Attributes:
  • attachments (Optional[Dict[str, str]]) –

    Dictionary mapping placeholder keys to attachment content

  • included_embed_metadata_keys (List[str]) –

    Metadata fields to include in embeddings

  • included_llm_metadata_keys (List[str]) –

    Metadata fields to include in LLM context

Note

DocType TypeVar ensures type safety when implementing document types. Default metadata includes title and timestamp information.

Source code in src/embedding/datasources/core/document.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class BaseDocument(Document):
    """Base document class for structured content storage.

    Extends LlamaIndex Document to add support for attachments and
    metadata filtering for embedding and LLM contexts.

    Attributes:
        attachments: Dictionary mapping placeholder keys to attachment content
        included_embed_metadata_keys: Metadata fields to include in embeddings
        included_llm_metadata_keys: Metadata fields to include in LLM context

    Note:
        DocType TypeVar ensures type safety when implementing document types.
        Default metadata includes title and timestamp information.
    """

    attachments: Optional[Dict[str, str]] = Field(
        description="Attachments of the document. Key is the attachment placeholder in raw_content and value is the Attachment object",
        default=None,
    )

    included_embed_metadata_keys: List[str] = [
        "title",
        "created_time",
        "last_edited_time",
    ]

    included_llm_metadata_keys: List[str] = [
        "title",
        "created_time",
        "last_edited_time",
    ]

Manager

BaseDatasourceManager

Bases: ABC, Generic[DocType]

Abstract base class for datasource management.

Provides interface for content extraction and vector storage updates.

Attributes:
  • configuration

    Settings for embedding and processing

  • reader

    Component for reading source content

  • cleaner

    Component for cleaning extracted content

  • splitter

    Component for splitting content into chunks

Source code in src/embedding/datasources/core/manager.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class BaseDatasourceManager(ABC, Generic[DocType]):
    """Abstract base class for datasource management.

    Provides interface for content extraction and vector storage updates.

    Attributes:
        configuration: Settings for embedding and processing
        reader: Component for reading source content
        cleaner: Component for cleaning extracted content
        splitter: Component for splitting content into chunks
    """

    def __init__(
        self,
        configuration: Configuration,
        reader: BaseReader,
        cleaner: BaseCleaner,
        splitter: BaseSplitter,
    ):
        """Initialize datasource manager.

        Args:
            configuration: Embedding and processing settings
            reader: Content extraction component
            cleaner: Content cleaning component
            splitter: Content splitting component
        """
        self.configuration = configuration
        self.reader = reader
        self.cleaner = cleaner
        self.splitter = splitter

    @abstractmethod
    async def extract(
        self,
    ) -> Tuple[List[DocType], List[DocType], List[TextNode]]:
        """Extract and process content from datasource.

        Returns:
            Tuple containing:
                - List of raw documents
                - List of cleaned documents
                - List of text nodes for embedding
        """
        pass

    @abstractmethod
    def update_vector_storage(self):
        """Update vector storage with current embeddings."""
        pass

__init__(configuration, reader, cleaner, splitter)

Initialize datasource manager.

Parameters:
  • configuration (Configuration) –

    Embedding and processing settings

  • reader (BaseReader) –

    Content extraction component

  • cleaner (BaseCleaner) –

    Content cleaning component

  • splitter (BaseSplitter) –

    Content splitting component

Source code in src/embedding/datasources/core/manager.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(
    self,
    configuration: Configuration,
    reader: BaseReader,
    cleaner: BaseCleaner,
    splitter: BaseSplitter,
):
    """Initialize datasource manager.

    Args:
        configuration: Embedding and processing settings
        reader: Content extraction component
        cleaner: Content cleaning component
        splitter: Content splitting component
    """
    self.configuration = configuration
    self.reader = reader
    self.cleaner = cleaner
    self.splitter = splitter

extract() abstractmethod async

Extract and process content from datasource.

Returns:
  • Tuple[List[DocType], List[DocType], List[TextNode]]

    Tuple containing: - List of raw documents - List of cleaned documents - List of text nodes for embedding

Source code in src/embedding/datasources/core/manager.py
45
46
47
48
49
50
51
52
53
54
55
56
57
@abstractmethod
async def extract(
    self,
) -> Tuple[List[DocType], List[DocType], List[TextNode]]:
    """Extract and process content from datasource.

    Returns:
        Tuple containing:
            - List of raw documents
            - List of cleaned documents
            - List of text nodes for embedding
    """
    pass

update_vector_storage() abstractmethod

Update vector storage with current embeddings.

Source code in src/embedding/datasources/core/manager.py
59
60
61
62
@abstractmethod
def update_vector_storage(self):
    """Update vector storage with current embeddings."""
    pass

DatasourceManager

Bases: BaseDatasourceManager

Manager for datasource content processing and embedding.

Implements content extraction pipeline using configurable components for reading, cleaning, splitting and embedding content.

Source code in src/embedding/datasources/core/manager.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class DatasourceManager(BaseDatasourceManager):
    """Manager for datasource content processing and embedding.

    Implements content extraction pipeline using configurable components
    for reading, cleaning, splitting and embedding content.
    """

    async def extract(
        self,
    ) -> Tuple[List[Document], List[Document], List[TextNode]]:
        """Extract and process content from datasource.

        Returns:
            Tuple containing:
                - List of raw documents
                - List of cleaned documents
                - List of text nodes for embedding
        """
        documents = await self.reader.get_all_documents_async()
        cleaned_documents = self.cleaner.clean(documents)
        nodes = self.splitter.split(cleaned_documents)
        return documents, cleaned_documents, nodes

    def update_vector_storage(self):
        """Update vector storage with current embeddings.

        Raises:
            NotImplementedError: Method must be implemented by subclasses
        """
        raise NotImplementedError

extract() async

Extract and process content from datasource.

Returns:
  • Tuple[List[Document], List[Document], List[TextNode]]

    Tuple containing: - List of raw documents - List of cleaned documents - List of text nodes for embedding

Source code in src/embedding/datasources/core/manager.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
async def extract(
    self,
) -> Tuple[List[Document], List[Document], List[TextNode]]:
    """Extract and process content from datasource.

    Returns:
        Tuple containing:
            - List of raw documents
            - List of cleaned documents
            - List of text nodes for embedding
    """
    documents = await self.reader.get_all_documents_async()
    cleaned_documents = self.cleaner.clean(documents)
    nodes = self.splitter.split(cleaned_documents)
    return documents, cleaned_documents, nodes

update_vector_storage()

Update vector storage with current embeddings.

Raises:
  • NotImplementedError

    Method must be implemented by subclasses

Source code in src/embedding/datasources/core/manager.py
88
89
90
91
92
93
94
def update_vector_storage(self):
    """Update vector storage with current embeddings.

    Raises:
        NotImplementedError: Method must be implemented by subclasses
    """
    raise NotImplementedError

Reader

BaseReader

Bases: ABC, Generic[DocType]

Abstract base class for document source readers.

Defines interface for document extraction from various sources. Supports both synchronous and asynchronous implementations through generic typing for document types.

Source code in src/embedding/datasources/core/reader.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class BaseReader(ABC, Generic[DocType]):
    """Abstract base class for document source readers.

    Defines interface for document extraction from various sources.
    Supports both synchronous and asynchronous implementations through
    generic typing for document types.

    Attributes:
        None
    """

    @abstractmethod
    def get_all_documents(self) -> List[DocType]:
        """Synchronously retrieve all documents from source.

        Returns:
            List[DocType]: Collection of extracted documents

        Raises:
            NotImplementedError: Must be implemented by concrete classes
        """
        pass

    @abstractmethod
    async def get_all_documents_async(self) -> List[DocType]:
        """Asynchronously retrieve all documents from source.

        Returns:
            List[DocType]: Collection of extracted documents

        Raises:
            NotImplementedError: Must be implemented by concrete classes
        """
        pass

get_all_documents() abstractmethod

Synchronously retrieve all documents from source.

Returns:
  • List[DocType]

    List[DocType]: Collection of extracted documents

Raises:
  • NotImplementedError

    Must be implemented by concrete classes

Source code in src/embedding/datasources/core/reader.py
18
19
20
21
22
23
24
25
26
27
28
@abstractmethod
def get_all_documents(self) -> List[DocType]:
    """Synchronously retrieve all documents from source.

    Returns:
        List[DocType]: Collection of extracted documents

    Raises:
        NotImplementedError: Must be implemented by concrete classes
    """
    pass

get_all_documents_async() abstractmethod async

Asynchronously retrieve all documents from source.

Returns:
  • List[DocType]

    List[DocType]: Collection of extracted documents

Raises:
  • NotImplementedError

    Must be implemented by concrete classes

Source code in src/embedding/datasources/core/reader.py
30
31
32
33
34
35
36
37
38
39
40
@abstractmethod
async def get_all_documents_async(self) -> List[DocType]:
    """Asynchronously retrieve all documents from source.

    Returns:
        List[DocType]: Collection of extracted documents

    Raises:
        NotImplementedError: Must be implemented by concrete classes
    """
    pass

Splitter

BaseSplitter

Bases: ABC, Generic[DocType]

Abstract base class for document splitting.

Defines interface for splitting documents into text nodes with generic typing support for document types.

Source code in src/embedding/datasources/core/splitter.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BaseSplitter(ABC, Generic[DocType]):
    """Abstract base class for document splitting.

    Defines interface for splitting documents into text nodes with
    generic typing support for document types.
    """

    @abstractmethod
    def split(self, documents: List[DocType]) -> List[TextNode]:
        """Split documents into text nodes.

        Args:
            documents: Collection of documents to split

        Returns:
            List[TextNode]: Collection of text nodes
        """
        pass

split(documents) abstractmethod

Split documents into text nodes.

Parameters:
  • documents (List[DocType]) –

    Collection of documents to split

Returns:
  • List[TextNode]

    List[TextNode]: Collection of text nodes

Source code in src/embedding/datasources/core/splitter.py
19
20
21
22
23
24
25
26
27
28
29
@abstractmethod
def split(self, documents: List[DocType]) -> List[TextNode]:
    """Split documents into text nodes.

    Args:
        documents: Collection of documents to split

    Returns:
        List[TextNode]: Collection of text nodes
    """
    pass

MarkdownSplitter

Bases: BaseSplitter

Splitter for markdown documents with token-based chunking.

Splits markdown content into nodes based on document structure and token limits. Supports node merging and splitting to maintain consistent chunk sizes.

Attributes:
  • chunk_size_in_tokens

    Maximum tokens per chunk

  • tokenize_func

    Function to convert text to tokens

  • markdown_node_parser

    Parser for markdown structure

  • sentence_splitter

    Splitter for text chunks

Source code in src/embedding/datasources/core/splitter.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class MarkdownSplitter(BaseSplitter):
    """Splitter for markdown documents with token-based chunking.

    Splits markdown content into nodes based on document structure and
    token limits. Supports node merging and splitting to maintain
    consistent chunk sizes.

    Attributes:
        chunk_size_in_tokens: Maximum tokens per chunk
        tokenize_func: Function to convert text to tokens
        markdown_node_parser: Parser for markdown structure
        sentence_splitter: Splitter for text chunks
    """

    def __init__(
        self,
        chunk_size_in_tokens: int,
        chunk_overlap_in_tokens: int,
        tokenize_func: Callable,
    ):
        """Initialize markdown splitter.

        Args:
            chunk_size_in_tokens: Maximum tokens per chunk
            chunk_overlap_in_tokens: Token overlap between chunks
            tokenize_func: Function to tokenize text
        """
        self.chunk_size_in_tokens = chunk_size_in_tokens
        self.tokenize_func = tokenize_func

        self.markdown_node_parser = MarkdownNodeParser()
        self.sentence_splitter = SentenceSplitter(
            chunk_size=chunk_size_in_tokens,
            chunk_overlap=chunk_overlap_in_tokens,
            tokenizer=tokenize_func,
        )

    def split(self, documents: List[Document]) -> List[TextNode]:
        """Split markdown documents into text nodes.

        Processes documents through markdown parsing, then adjusts node sizes
        through splitting and merging to match chunk size requirements.

        Args:
            documents: Collection of markdown documents

        Returns:
            List[TextNode]: Collection of processed text nodes
        """
        nodes = []

        for document in documents:
            document_nodes = self.markdown_node_parser.get_nodes_from_documents(
                [document]
            )
            document_nodes = self._split_big_nodes(document_nodes)
            document_nodes = self._merge_small_nodes(document_nodes)
            nodes.extend(document_nodes)

        return nodes

    def _split_big_nodes(
        self, document_nodes: List[TextNode]
    ) -> List[TextNode]:
        """Split oversized nodes into smaller chunks.

        Args:
            document_nodes: Collection of nodes to process

        Returns:
            List[TextNode]: Processed nodes within size limits
        """
        new_document_nodes = []

        for document_node in document_nodes:
            text = document_node.text
            document_node_size = len(self.tokenize_func(text))

            if document_node_size > self.chunk_size_in_tokens:
                document_sub_nodes = self._split_big_node(document_node)
                new_document_nodes.extend(document_sub_nodes)
            else:
                new_document_nodes.append(document_node)

        return new_document_nodes

    def _split_big_node(self, document_node: TextNode) -> List[TextNode]:
        """Split single oversized node into smaller nodes.

        Args:
            document_node: Node to split

        Returns:
            List[TextNode]: Collection of smaller nodes
        """
        text = document_node.text
        sub_texts = self.sentence_splitter.split_text(text)
        sub_nodes = []

        for sub_text in sub_texts:
            sub_node = document_node.model_copy()
            sub_node.id_ = str(uuid.uuid4())
            sub_node.text = sub_text
            sub_nodes.append(sub_node)

        return sub_nodes

    def _merge_small_nodes(
        self, document_nodes: List[TextNode]
    ) -> List[TextNode]:
        """Merge adjacent small nodes into larger chunks.

        Args:
            document_nodes: Collection of nodes to process

        Returns:
            List[TextNode]: Collection of merged nodes
        """
        new_document_nodes = []
        current_node = document_nodes[0]

        for node in document_nodes[1:]:
            current_text = current_node.text
            current_node_size = len(self.tokenize_func(current_text))
            node_text = node.text
            node_size = len(self.tokenize_func(node_text))

            if current_node_size + node_size <= self.chunk_size_in_tokens:
                current_node.text += node.text
            else:
                new_document_nodes.append(current_node)
                current_node = node

        new_document_nodes.append(current_node)
        return new_document_nodes

__init__(chunk_size_in_tokens, chunk_overlap_in_tokens, tokenize_func)

Initialize markdown splitter.

Parameters:
  • chunk_size_in_tokens (int) –

    Maximum tokens per chunk

  • chunk_overlap_in_tokens (int) –

    Token overlap between chunks

  • tokenize_func (Callable) –

    Function to tokenize text

Source code in src/embedding/datasources/core/splitter.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(
    self,
    chunk_size_in_tokens: int,
    chunk_overlap_in_tokens: int,
    tokenize_func: Callable,
):
    """Initialize markdown splitter.

    Args:
        chunk_size_in_tokens: Maximum tokens per chunk
        chunk_overlap_in_tokens: Token overlap between chunks
        tokenize_func: Function to tokenize text
    """
    self.chunk_size_in_tokens = chunk_size_in_tokens
    self.tokenize_func = tokenize_func

    self.markdown_node_parser = MarkdownNodeParser()
    self.sentence_splitter = SentenceSplitter(
        chunk_size=chunk_size_in_tokens,
        chunk_overlap=chunk_overlap_in_tokens,
        tokenizer=tokenize_func,
    )

_merge_small_nodes(document_nodes)

Merge adjacent small nodes into larger chunks.

Parameters:
  • document_nodes (List[TextNode]) –

    Collection of nodes to process

Returns:
  • List[TextNode]

    List[TextNode]: Collection of merged nodes

Source code in src/embedding/datasources/core/splitter.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def _merge_small_nodes(
    self, document_nodes: List[TextNode]
) -> List[TextNode]:
    """Merge adjacent small nodes into larger chunks.

    Args:
        document_nodes: Collection of nodes to process

    Returns:
        List[TextNode]: Collection of merged nodes
    """
    new_document_nodes = []
    current_node = document_nodes[0]

    for node in document_nodes[1:]:
        current_text = current_node.text
        current_node_size = len(self.tokenize_func(current_text))
        node_text = node.text
        node_size = len(self.tokenize_func(node_text))

        if current_node_size + node_size <= self.chunk_size_in_tokens:
            current_node.text += node.text
        else:
            new_document_nodes.append(current_node)
            current_node = node

    new_document_nodes.append(current_node)
    return new_document_nodes

_split_big_node(document_node)

Split single oversized node into smaller nodes.

Parameters:
  • document_node (TextNode) –

    Node to split

Returns:
  • List[TextNode]

    List[TextNode]: Collection of smaller nodes

Source code in src/embedding/datasources/core/splitter.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def _split_big_node(self, document_node: TextNode) -> List[TextNode]:
    """Split single oversized node into smaller nodes.

    Args:
        document_node: Node to split

    Returns:
        List[TextNode]: Collection of smaller nodes
    """
    text = document_node.text
    sub_texts = self.sentence_splitter.split_text(text)
    sub_nodes = []

    for sub_text in sub_texts:
        sub_node = document_node.model_copy()
        sub_node.id_ = str(uuid.uuid4())
        sub_node.text = sub_text
        sub_nodes.append(sub_node)

    return sub_nodes

_split_big_nodes(document_nodes)

Split oversized nodes into smaller chunks.

Parameters:
  • document_nodes (List[TextNode]) –

    Collection of nodes to process

Returns:
  • List[TextNode]

    List[TextNode]: Processed nodes within size limits

Source code in src/embedding/datasources/core/splitter.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def _split_big_nodes(
    self, document_nodes: List[TextNode]
) -> List[TextNode]:
    """Split oversized nodes into smaller chunks.

    Args:
        document_nodes: Collection of nodes to process

    Returns:
        List[TextNode]: Processed nodes within size limits
    """
    new_document_nodes = []

    for document_node in document_nodes:
        text = document_node.text
        document_node_size = len(self.tokenize_func(text))

        if document_node_size > self.chunk_size_in_tokens:
            document_sub_nodes = self._split_big_node(document_node)
            new_document_nodes.extend(document_sub_nodes)
        else:
            new_document_nodes.append(document_node)

    return new_document_nodes

split(documents)

Split markdown documents into text nodes.

Processes documents through markdown parsing, then adjusts node sizes through splitting and merging to match chunk size requirements.

Parameters:
  • documents (List[Document]) –

    Collection of markdown documents

Returns:
  • List[TextNode]

    List[TextNode]: Collection of processed text nodes

Source code in src/embedding/datasources/core/splitter.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def split(self, documents: List[Document]) -> List[TextNode]:
    """Split markdown documents into text nodes.

    Processes documents through markdown parsing, then adjusts node sizes
    through splitting and merging to match chunk size requirements.

    Args:
        documents: Collection of markdown documents

    Returns:
        List[TextNode]: Collection of processed text nodes
    """
    nodes = []

    for document in documents:
        document_nodes = self.markdown_node_parser.get_nodes_from_documents(
            [document]
        )
        document_nodes = self._split_big_nodes(document_nodes)
        document_nodes = self._merge_small_nodes(document_nodes)
        nodes.extend(document_nodes)

    return nodes

Builders

MarkdownSplitterBuilder

Builder for creating markdown content splitter instances.

Provides factory method to create configured MarkdownSplitter objects using embedding model settings for chunking parameters.

Source code in src/embedding/datasources/core/builders.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class MarkdownSplitterBuilder:
    """Builder for creating markdown content splitter instances.

    Provides factory method to create configured MarkdownSplitter objects
    using embedding model settings for chunking parameters.
    """

    @staticmethod
    @inject
    def build(
        embedding_model_configuration: EmbeddingModelConfiguration,
    ) -> MarkdownSplitter:
        """Creates a configured markdown splitter instance.

        Args:
            embedding_model_configuration: Configuration containing tokenization
                and chunking parameters.

        Returns:
            MarkdownSplitter: Configured splitter instance using model's
                chunk size, overlap, and tokenization settings.
        """
        return MarkdownSplitter(
            chunk_size_in_tokens=embedding_model_configuration.splitting.chunk_size_in_tokens,
            chunk_overlap_in_tokens=embedding_model_configuration.splitting.chunk_overlap_in_tokens,
            tokenize_func=embedding_model_configuration.tokenizer_func,
        )

build(embedding_model_configuration) staticmethod

Creates a configured markdown splitter instance.

Parameters:
  • embedding_model_configuration (EmbeddingModelConfiguration) –

    Configuration containing tokenization and chunking parameters.

Returns:
  • MarkdownSplitter( MarkdownSplitter ) –

    Configured splitter instance using model's chunk size, overlap, and tokenization settings.

Source code in src/embedding/datasources/core/builders.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@staticmethod
@inject
def build(
    embedding_model_configuration: EmbeddingModelConfiguration,
) -> MarkdownSplitter:
    """Creates a configured markdown splitter instance.

    Args:
        embedding_model_configuration: Configuration containing tokenization
            and chunking parameters.

    Returns:
        MarkdownSplitter: Configured splitter instance using model's
            chunk size, overlap, and tokenization settings.
    """
    return MarkdownSplitter(
        chunk_size_in_tokens=embedding_model_configuration.splitting.chunk_size_in_tokens,
        chunk_overlap_in_tokens=embedding_model_configuration.splitting.chunk_overlap_in_tokens,
        tokenize_func=embedding_model_configuration.tokenizer_func,
    )