Notion Datasource

This module contains functionality related to the Notion datasource.

Cleaner

NotionCleaner

Bases: BaseCleaner

Cleaner for Notion document content.

Implements cleaning logic for Notion databases and pages, removing HTML tags and comments while preserving meaningful content.

Note

Expects documents to be in markdown format.

Source code in src/embedding/datasources/notion/cleaner.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class NotionCleaner(BaseCleaner):
    """Cleaner for Notion document content.

    Implements cleaning logic for Notion databases and pages, removing HTML
    tags and comments while preserving meaningful content.

    Note:
        Expects documents to be in markdown format.
    """

    def clean(self, documents: List[NotionDocument]) -> List[NotionDocument]:
        """Clean a collection of Notion documents.

        Processes both databases and pages, removing HTML artifacts and empty content.

        Args:
            documents: Collection of Notion documents to clean

        Returns:
            List[NotionDocument]: Filtered and cleaned documents
        """
        cleaned_documents = []

        for document in NotionCleaner._get_documents_with_tqdm(documents):
            if document.extra_info["type"] == "database":
                document.text = self._clean_database(document)
            if document.extra_info["type"] == "page":
                document.text = self._clean_page(document)

            if not NotionCleaner._has_empty_content(document):
                cleaned_documents.append(document)

        return cleaned_documents

    def _clean_database(self, document: NotionDocument) -> str:
        """Clean Notion database content.

        Args:
            document: Database document to clean

        Returns:
            str: Cleaned database content
        """
        return NotionCleaner._parse_html_in_markdown(document.text)

    def _clean_page(self, document: NotionDocument) -> str:
        """Clean Notion page content.

        Args:
            document: Page document to clean

        Returns:
            str: Cleaned page content
        """
        return NotionCleaner._parse_html_in_markdown(document.text)

    @staticmethod
    def _parse_html_in_markdown(md_text: str) -> str:
        """Process HTML elements within markdown content.

        Converts HTML to markdown and removes content without alphanumeric characters.

        Args:
            md_text: Text containing markdown and HTML

        Returns:
            str: Cleaned markdown text

        Note:
            Uses BeautifulSoup for HTML parsing
        """

        def replace_html(match):
            html_content = match.group(0)
            soup = BeautifulSoup(html_content, "html.parser")
            markdown = md(str(soup))

            if not re.search(r"[a-zA-Z0-9]", markdown):
                return ""
            return markdown

        md_text = re.sub(r"<!--.*?-->", "", md_text, flags=re.DOTALL)
        html_block_re = re.compile(r"<.*?>", re.DOTALL)
        return re.sub(html_block_re, replace_html, md_text)

    @staticmethod
    def _get_documents_with_tqdm(documents: List[NotionDocument]):
        """Wrap document iteration with optional progress bar.

        Args:
            documents: Collection of documents to process

        Returns:
            Iterator over documents, optionally with progress bar
        """
        return tqdm(documents, desc="[Notion] Cleaning documents")

_clean_database(document)

Clean Notion database content.

Parameters:
  • document (NotionDocument) –

    Database document to clean

Returns:
  • str( str ) –

    Cleaned database content

Source code in src/embedding/datasources/notion/cleaner.py
46
47
48
49
50
51
52
53
54
55
def _clean_database(self, document: NotionDocument) -> str:
    """Clean Notion database content.

    Args:
        document: Database document to clean

    Returns:
        str: Cleaned database content
    """
    return NotionCleaner._parse_html_in_markdown(document.text)

_clean_page(document)

Clean Notion page content.

Parameters:
  • document (NotionDocument) –

    Page document to clean

Returns:
  • str( str ) –

    Cleaned page content

Source code in src/embedding/datasources/notion/cleaner.py
57
58
59
60
61
62
63
64
65
66
def _clean_page(self, document: NotionDocument) -> str:
    """Clean Notion page content.

    Args:
        document: Page document to clean

    Returns:
        str: Cleaned page content
    """
    return NotionCleaner._parse_html_in_markdown(document.text)

_get_documents_with_tqdm(documents) staticmethod

Wrap document iteration with optional progress bar.

Parameters:
  • documents (List[NotionDocument]) –

    Collection of documents to process

Returns:
  • Iterator over documents, optionally with progress bar

Source code in src/embedding/datasources/notion/cleaner.py
 97
 98
 99
100
101
102
103
104
105
106
107
@staticmethod
def _get_documents_with_tqdm(documents: List[NotionDocument]):
    """Wrap document iteration with optional progress bar.

    Args:
        documents: Collection of documents to process

    Returns:
        Iterator over documents, optionally with progress bar
    """
    return tqdm(documents, desc="[Notion] Cleaning documents")

_parse_html_in_markdown(md_text) staticmethod

Process HTML elements within markdown content.

Converts HTML to markdown and removes content without alphanumeric characters.

Parameters:
  • md_text (str) –

    Text containing markdown and HTML

Returns:
  • str( str ) –

    Cleaned markdown text

Note

Uses BeautifulSoup for HTML parsing

Source code in src/embedding/datasources/notion/cleaner.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@staticmethod
def _parse_html_in_markdown(md_text: str) -> str:
    """Process HTML elements within markdown content.

    Converts HTML to markdown and removes content without alphanumeric characters.

    Args:
        md_text: Text containing markdown and HTML

    Returns:
        str: Cleaned markdown text

    Note:
        Uses BeautifulSoup for HTML parsing
    """

    def replace_html(match):
        html_content = match.group(0)
        soup = BeautifulSoup(html_content, "html.parser")
        markdown = md(str(soup))

        if not re.search(r"[a-zA-Z0-9]", markdown):
            return ""
        return markdown

    md_text = re.sub(r"<!--.*?-->", "", md_text, flags=re.DOTALL)
    html_block_re = re.compile(r"<.*?>", re.DOTALL)
    return re.sub(html_block_re, replace_html, md_text)

clean(documents)

Clean a collection of Notion documents.

Processes both databases and pages, removing HTML artifacts and empty content.

Parameters:
  • documents (List[NotionDocument]) –

    Collection of Notion documents to clean

Returns:
  • List[NotionDocument]

    List[NotionDocument]: Filtered and cleaned documents

Source code in src/embedding/datasources/notion/cleaner.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def clean(self, documents: List[NotionDocument]) -> List[NotionDocument]:
    """Clean a collection of Notion documents.

    Processes both databases and pages, removing HTML artifacts and empty content.

    Args:
        documents: Collection of Notion documents to clean

    Returns:
        List[NotionDocument]: Filtered and cleaned documents
    """
    cleaned_documents = []

    for document in NotionCleaner._get_documents_with_tqdm(documents):
        if document.extra_info["type"] == "database":
            document.text = self._clean_database(document)
        if document.extra_info["type"] == "page":
            document.text = self._clean_page(document)

        if not NotionCleaner._has_empty_content(document):
            cleaned_documents.append(document)

    return cleaned_documents

Document

NotionDocument

Bases: BaseDocument

Document representation for Notion page content.

Extends BaseDocument to handle Notion-specific document processing including metadata handling and filtering for embeddings and LLM contexts.

Attributes:
  • attachments

    Dictionary of document attachments

  • text

    Document content in markdown format

  • metadata

    Extracted page metadata including dates and source info

  • excluded_embed_metadata_keys

    Metadata keys to exclude from embeddings

  • excluded_llm_metadata_keys

    Metadata keys to exclude from LLM context

Source code in src/embedding/datasources/notion/document.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class NotionDocument(BaseDocument):
    """Document representation for Notion page content.

    Extends BaseDocument to handle Notion-specific document processing including
    metadata handling and filtering for embeddings and LLM contexts.

    Attributes:
        attachments: Dictionary of document attachments
        text: Document content in markdown format
        metadata: Extracted page metadata including dates and source info
        excluded_embed_metadata_keys: Metadata keys to exclude from embeddings
        excluded_llm_metadata_keys: Metadata keys to exclude from LLM context
    """

    @classmethod
    def from_page(cls, metadata: dict, text: str) -> "NotionDocument":
        """Create NotionDocument instance from page data.

        Args:
            metadata: Dictionary containing page metadata
            text: Extracted page content

        Returns:
            NotionDocument: Configured document instance
        """
        document = cls(
            attachments={},
            text=text,
            metadata=NotionDocument._get_metadata(metadata),
        )
        document._set_excluded_embed_metadata_keys()
        document._set_excluded_llm_metadata_keys()
        return document

    def _set_excluded_embed_metadata_keys(self) -> None:
        """Configure metadata keys to exclude from embeddings.

        Identifies metadata keys not explicitly included in embedding
        processing and marks them for exclusion.
        """
        metadata_keys = self.metadata.keys()
        self.excluded_embed_metadata_keys = [
            key
            for key in metadata_keys
            if key not in self.included_embed_metadata_keys
        ]

    def _set_excluded_llm_metadata_keys(self) -> None:
        """Configure metadata keys to exclude from LLM context.

        Identifies metadata keys not explicitly included in LLM
        processing and marks them for exclusion.
        """
        metadata_keys = self.metadata.keys()
        self.excluded_llm_metadata_keys = [
            key
            for key in metadata_keys
            if key not in self.included_llm_metadata_keys
        ]

    @staticmethod
    def _get_metadata(metadata: dict) -> dict:
        """Process and enhance page metadata.

        Args:
            metadata: Raw page metadata dictionary

        Returns:
            dict: Enhanced metadata including source and formatted dates
        """
        metadata["datasource"] = "notion"
        metadata["created_date"] = metadata["created_time"].split("T")[0]
        metadata["last_edited_date"] = metadata["last_edited_time"].split("T")[
            0
        ]
        return metadata

_get_metadata(metadata) staticmethod

Process and enhance page metadata.

Parameters:
  • metadata (dict) –

    Raw page metadata dictionary

Returns:
  • dict( dict ) –

    Enhanced metadata including source and formatted dates

Source code in src/embedding/datasources/notion/document.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@staticmethod
def _get_metadata(metadata: dict) -> dict:
    """Process and enhance page metadata.

    Args:
        metadata: Raw page metadata dictionary

    Returns:
        dict: Enhanced metadata including source and formatted dates
    """
    metadata["datasource"] = "notion"
    metadata["created_date"] = metadata["created_time"].split("T")[0]
    metadata["last_edited_date"] = metadata["last_edited_time"].split("T")[
        0
    ]
    return metadata

_set_excluded_embed_metadata_keys()

Configure metadata keys to exclude from embeddings.

Identifies metadata keys not explicitly included in embedding processing and marks them for exclusion.

Source code in src/embedding/datasources/notion/document.py
38
39
40
41
42
43
44
45
46
47
48
49
def _set_excluded_embed_metadata_keys(self) -> None:
    """Configure metadata keys to exclude from embeddings.

    Identifies metadata keys not explicitly included in embedding
    processing and marks them for exclusion.
    """
    metadata_keys = self.metadata.keys()
    self.excluded_embed_metadata_keys = [
        key
        for key in metadata_keys
        if key not in self.included_embed_metadata_keys
    ]

_set_excluded_llm_metadata_keys()

Configure metadata keys to exclude from LLM context.

Identifies metadata keys not explicitly included in LLM processing and marks them for exclusion.

Source code in src/embedding/datasources/notion/document.py
51
52
53
54
55
56
57
58
59
60
61
62
def _set_excluded_llm_metadata_keys(self) -> None:
    """Configure metadata keys to exclude from LLM context.

    Identifies metadata keys not explicitly included in LLM
    processing and marks them for exclusion.
    """
    metadata_keys = self.metadata.keys()
    self.excluded_llm_metadata_keys = [
        key
        for key in metadata_keys
        if key not in self.included_llm_metadata_keys
    ]

from_page(metadata, text) classmethod

Create NotionDocument instance from page data.

Parameters:
  • metadata (dict) –

    Dictionary containing page metadata

  • text (str) –

    Extracted page content

Returns:
Source code in src/embedding/datasources/notion/document.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@classmethod
def from_page(cls, metadata: dict, text: str) -> "NotionDocument":
    """Create NotionDocument instance from page data.

    Args:
        metadata: Dictionary containing page metadata
        text: Extracted page content

    Returns:
        NotionDocument: Configured document instance
    """
    document = cls(
        attachments={},
        text=text,
        metadata=NotionDocument._get_metadata(metadata),
    )
    document._set_excluded_embed_metadata_keys()
    document._set_excluded_llm_metadata_keys()
    return document

Exporter

NotionExporter

Exporter for converting Notion pages to markdown documents.

Handles extraction and conversion of Notion pages and databases to NotionDocument instances with markdown content.

Attributes:
  • notion_exporter

    Core exporter instance for content extraction

Source code in src/embedding/datasources/notion/exporter.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
class NotionExporter:
    """Exporter for converting Notion pages to markdown documents.

    Handles extraction and conversion of Notion pages and databases
    to NotionDocument instances with markdown content.

    Attributes:
        notion_exporter: Core exporter instance for content extraction
    """

    def __init__(
        self,
        api_token: str,
    ):
        """Initialize Notion exporter.

        Args:
            api_token: Authentication token for Notion API
        """
        self.notion_exporter = _NotionExporterCore(
            notion_token=api_token,
            export_child_pages=False,
            extract_page_metadata=True,
        )

    async def run(
        self, page_ids: List[str] = None, database_ids: List[str] = None
    ) -> List[NotionDocument]:
        """Export Notion content to document collection.

        Args:
            page_ids: List of page IDs to export
            database_ids: List of database IDs to export

        Returns:
            List[NotionDocument]: Collection of exported documents

        Raises:
            ValueError: If neither page_ids nor database_ids provided
        """
        extracted_objects = await self.notion_exporter.async_export_pages(
            page_ids=page_ids, database_ids=database_ids
        )

        documents = []
        for object_id, extracted_data in extracted_objects.items():
            document = NotionDocument.from_page(
                metadata=extracted_data["metadata"],
                text=extracted_data["content"],
            )
            documents.append(document)

        return documents

__init__(api_token)

Initialize Notion exporter.

Parameters:
  • api_token (str) –

    Authentication token for Notion API

Source code in src/embedding/datasources/notion/exporter.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def __init__(
    self,
    api_token: str,
):
    """Initialize Notion exporter.

    Args:
        api_token: Authentication token for Notion API
    """
    self.notion_exporter = _NotionExporterCore(
        notion_token=api_token,
        export_child_pages=False,
        extract_page_metadata=True,
    )

run(page_ids=None, database_ids=None) async

Export Notion content to document collection.

Parameters:
  • page_ids (List[str], default: None ) –

    List of page IDs to export

  • database_ids (List[str], default: None ) –

    List of database IDs to export

Returns:
  • List[NotionDocument]

    List[NotionDocument]: Collection of exported documents

Raises:
  • ValueError

    If neither page_ids nor database_ids provided

Source code in src/embedding/datasources/notion/exporter.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
async def run(
    self, page_ids: List[str] = None, database_ids: List[str] = None
) -> List[NotionDocument]:
    """Export Notion content to document collection.

    Args:
        page_ids: List of page IDs to export
        database_ids: List of database IDs to export

    Returns:
        List[NotionDocument]: Collection of exported documents

    Raises:
        ValueError: If neither page_ids nor database_ids provided
    """
    extracted_objects = await self.notion_exporter.async_export_pages(
        page_ids=page_ids, database_ids=database_ids
    )

    documents = []
    for object_id, extracted_data in extracted_objects.items():
        document = NotionDocument.from_page(
            metadata=extracted_data["metadata"],
            text=extracted_data["content"],
        )
        documents.append(document)

    return documents

_BlockConverter

Bases: BlockConverter

Source code in src/embedding/datasources/notion/exporter.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class _BlockConverter(BlockConverter):

    def convert_block(
        self, block: dict, indent: bool = False, indent_level: int = 0
    ) -> str:
        """
        Converts a block to a Markdown string.
        """
        try:
            return super().convert_block(block, indent, indent_level)
        except Exception:
            logging.warning(
                f"Failed to convert property: {traceback.format_exc()}. Using 'None'."
            )
            return "None"

convert_block(block, indent=False, indent_level=0)

Converts a block to a Markdown string.

Source code in src/embedding/datasources/notion/exporter.py
62
63
64
65
66
67
68
69
70
71
72
73
74
def convert_block(
    self, block: dict, indent: bool = False, indent_level: int = 0
) -> str:
    """
    Converts a block to a Markdown string.
    """
    try:
        return super().convert_block(block, indent, indent_level)
    except Exception:
        logging.warning(
            f"Failed to convert property: {traceback.format_exc()}. Using 'None'."
        )
        return "None"

_NotionExporterCore

Bases: NotionExporter

Custom version of notion_exporter.exporter.NotionExporter. Modifications are related to metadata parsing and asynchronous execution. Large amount of code corresponds to the original implementation. Modifications are marked with Custom modification comments.

Source code in src/embedding/datasources/notion/exporter.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
class _NotionExporterCore(NotionExporterCore):
    """
    Custom version of `notion_exporter.exporter.NotionExporter`. Modifications are related to metadata parsing and asynchronous execution.
    Large amount of code corresponds to the original implementation. Modifications are marked with `Custom modification` comments.
    """

    def __init__(
        self,
        notion_token: str,
        export_child_pages: bool = False,
        extract_page_metadata: bool = False,
        exclude_title_containing: Optional[str] = None,
    ):
        super().__init__(
            notion_token=notion_token,
            export_child_pages=export_child_pages,
            extract_page_metadata=extract_page_metadata,
            exclude_title_containing=exclude_title_containing,
        )
        self.property_converter = _PropertyConverter(self)
        self.block_converter = _BlockConverter()

    @retry_decorator
    async def _get_page_meta(self, page_id: str) -> dict:
        """
        Retrieve metadata of a page from Notion.
        Custom modification:
            - Remove `created_by` and `last_edited_by` calls.
            - Add `created_time`, `type` and `format`.

        :param page_id: The ID of the page.
        :return: A dictionary containing metadata of the page.
        """
        page_object = await self.notion.pages.retrieve(page_id)
        # Custom modification ---
        # Remove user-related calls
        # --- Custom modification

        # Database entries don't have an explicit title property, but a title column
        # Also, we extract all properties from the database entry to be able to add them to the markdown page as
        # key-value pairs
        properties = {}
        if page_object["parent"]["type"] == "database_id":
            title = ""
            for prop_name, prop in page_object["properties"].items():
                if prop["type"] == "title":
                    title = (
                        prop["title"][0]["plain_text"] if prop["title"] else ""
                    )
                properties[prop_name] = (
                    self.property_converter.convert_property(prop)
                )
        else:
            try:
                if "Page" in page_object["properties"]:
                    title = page_object["properties"]["Page"]["title"][0][
                        "plain_text"
                    ]
                elif "title" in page_object["properties"]:
                    title = page_object["properties"]["title"]["title"][0][
                        "plain_text"
                    ]
            except Exception:
                logging.warning(
                    f"Failed to extract title: {traceback.format_exc()}. Using 'None'."
                )
                title = "None"

        page_meta = {
            "title": title,
            "url": page_object["url"],
            # Custom modification ---
            # Remove user-related calls `created_by` and `last_edited_by`
            "created_time": page_object["created_time"],
            "type": "page",
            "format": "md",
            # --- Custom modification
            "last_edited_time": page_object["last_edited_time"],
            "page_id": page_object["id"],
            "parent_id": page_object["parent"][page_object["parent"]["type"]],
        }
        if properties:
            page_meta["properties"] = properties

        return page_meta

    @retry_decorator
    async def _get_database_meta(self, database_id: str) -> dict:
        """
        Retrieve metadata of a database from Notion.
        Custom modification:
            - Remove `created_by` and `last_edited_by` calls.
            - Add `created_time`, `type` and `format`.

        :param database_id: The ID of the database.
        :return: A dictionary containing metadata of the database.
        """
        try:
            database_object = await self.notion.databases.retrieve(database_id)
            # Custom modification ---
            # Remove user-related calls
            # --- Custom modification

            database_meta = {
                "title": (
                    database_object["title"][0]["plain_text"]
                    if database_object["title"]
                    else "Untitled"
                ),
                "url": database_object["url"],
                # Custom modification ---
                # Remove user-related calls `created_by` and `last_edited_by`
                "type": "database",
                "created_time": database_object["created_time"],
                "format": "md",
                # --- Custom modification
                "last_edited_time": database_object["last_edited_time"],
                "page_id": database_object["id"],
                "parent_id": database_object["parent"][
                    database_object["parent"]["type"]
                ],
            }
        except APIResponseError as exc:
            # Database is not available via API, might be a linked database
            if exc.code in ["object_not_found", "validation_error"]:
                database_meta = {
                    "title": "Untitled",
                    "url": "",
                    # Custom modification ---
                    # Remove user-related calls `created_by` and `last_edited_by`
                    "type": "database",
                    "created_time": "",
                    "format": "md",
                    # --- Custom modification
                    "last_edited_time": "",
                    "page_id": database_id,
                    "parent_id": "",
                }
            else:
                raise exc

        return database_meta

    @retry(
        retry=(
            retry_if_exception(predicate=is_rate_limit_exception)
            | retry_if_exception(predicate=is_unavailable_exception)
        ),
        wait=wait_for_retry_after_header(fallback=wait_exponential()),
        stop=stop_after_attempt(3),
    )
    async def _get_database_content(
        self, database_id: str
    ) -> tuple[str, set[str]]:
        try:
            database = await self.notion.databases.retrieve(database_id)
            database_entries = await async_collect_paginated_api(
                self.notion.databases.query, database_id=database_id
            )
            entry_ids = set()

            description = (
                database["description"][0]["plain_text"]
                if database["description"]
                else ""
            )

            title_column = [
                col_name
                for col_name, col in database["properties"].items()
                if col["type"] == "title"
            ][0]
            db_page_header = f"{description}\n\n"
            table_header = f"|{title_column}|{'|'.join([prop['name'] for prop in database['properties'].values() if prop['name'] != title_column])}|\n"
            table_header += "|" + "---|" * (len(database["properties"])) + "\n"
            table_body = ""

            for entry in database_entries:
                table_body += f"|{self.property_converter.convert_property(entry['properties'][title_column]).replace('|', ' ')}|"
                table_body += "|".join(
                    [
                        self.property_converter.convert_property(prop).replace(
                            "|", " "
                        )
                        for prop_name, prop in entry["properties"].items()
                        if prop_name != title_column
                    ]
                )
                table_body += "|\n"
                entry_ids.add(entry["id"])

            db_page = f"{db_page_header}{table_header}{table_body}"
        except APIResponseError as exc:
            # Database is not available via API, might be a linked database
            if exc.code in ["object_not_found", "validation_error"]:
                db_page = ""
                entry_ids = set()
            else:
                raise exc

        return db_page, entry_ids

    async def async_export_pages(
        self,
        page_ids: Optional[list[str]] = None,
        database_ids: Optional[list[str]] = None,
        ids_to_exclude: Optional[list[str]] = None,
    ) -> dict[str, str]:
        """
        Export pages and databases to markdown files.

        :param page_ids: List of page IDs to export.
        :param database_ids: List of database IDs to export.
        :param ids_to_exclude: List of IDs to ignore.
        """
        if page_ids is None and database_ids is None:
            raise ValueError(
                "Either page_ids or database_ids must be specified."
            )

        if ids_to_exclude is None:
            ids_to_exclude = set()
        if page_ids is None:
            page_ids = set()
        if database_ids is None:
            database_ids = set()

        page_ids = set(map(self._normalize_id, page_ids))
        database_ids = set(map(self._normalize_id, database_ids))
        ids_to_exclude = set(map(self._normalize_id, ids_to_exclude))

        page_ids = page_ids - ids_to_exclude
        database_ids = database_ids - ids_to_exclude

        extracted_pages, _, _ = await self._async_export_pages(
            page_ids=page_ids,
            database_ids=database_ids,
            ids_to_exclude=ids_to_exclude,
        )

        return extracted_pages

    async def _async_export_pages(
        self,
        page_ids: set[str],
        database_ids: set[str],
        ids_to_exclude: Optional[set] = None,
        parent_page_ids: Optional[dict] = None,
        page_paths: Optional[dict] = None,
    ):
        """
        Export pages and databases to markdown format.

        :param page_ids: List of page IDs to export.
        :param database_ids: List of database IDs to export.
        :param ids_to_exclude: List of IDs to ignore.
        """
        if ids_to_exclude is None:
            ids_to_exclude = set()
        if page_paths is None:
            page_paths = {}
        if parent_page_ids is None:
            parent_page_ids = {}

        page_ids -= ids_to_exclude
        database_ids -= ids_to_exclude
        ids_to_exclude.update(page_ids)
        ids_to_exclude.update(database_ids)

        extracted_pages = {}
        child_pages = set()
        child_databases = set()
        if page_ids:
            for page_id in page_ids:
                logging.info(f"Fetching page {page_id}.")
            page_meta_tasks = [
                self._get_page_meta(page_id) for page_id in page_ids
            ]
            page_content_tasks = [
                self._get_block_content(page_id) for page_id in page_ids
            ]
            page_details_results = await asyncio.gather(*page_meta_tasks)
            page_content_results = await asyncio.gather(*page_content_tasks)
            ids_to_exclude.update(
                page["page_id"] for page in page_details_results
            )

            for page_details, (
                markdown,
                child_page_ids,
                child_database_ids,
            ) in zip(page_details_results, page_content_results):
                if (
                    self.exclude_title_containing
                    and self.exclude_title_containing.lower()
                    in page_details.get("title", "").lower()
                ):
                    continue
                for child_page_id in child_page_ids:
                    parent_page_ids[child_page_id] = page_details["page_id"]
                for child_database_id in child_database_ids:
                    parent_page_ids[child_database_id] = page_details["page_id"]
                ## Custom modification ---
                # Remove frontmatter
                extracted_pages[page_details["page_id"]] = {
                    "content": "\n".join(markdown),
                    "metadata": page_details,
                }
                ## --- Custom modification
                child_pages.update(child_page_ids)
                child_databases.update(child_database_ids)

        if database_ids:
            for database_id in database_ids:
                logging.info(f"Fetching database {database_id}.")
            database_meta_tasks = [
                self._get_database_meta(database_id)
                for database_id in database_ids
            ]
            database_content_tasks = [
                self._get_database_content(database_id)
                for database_id in database_ids
            ]
            database_content_results = await asyncio.gather(
                *database_content_tasks
            )
            database_details_results = await asyncio.gather(
                *database_meta_tasks
            )
            ids_to_exclude.update(
                database["page_id"] for database in database_details_results
            )

            for db_details, (markdown, entry_ids) in zip(
                database_details_results, database_content_results
            ):
                if (
                    self.exclude_title_containing
                    and self.exclude_title_containing.lower()
                    in db_details.get("title", "").lower()
                ):
                    continue
                for entry_id in entry_ids:
                    parent_page_ids[entry_id] = db_details["page_id"]
                # Custom modification ---
                # Remove frontmatter
                extracted_pages[db_details["page_id"]] = {
                    "content": markdown,
                    "metadata": db_details,
                }
                # --- Custom modification
                child_pages.update(entry_ids)

        if self.export_child_pages and (child_pages or child_databases):
            extracted_child_pages, _, _ = await self._async_export_pages(
                page_ids=child_pages,
                database_ids=child_databases,
                ids_to_exclude=ids_to_exclude,
                parent_page_ids=parent_page_ids,
                page_paths=page_paths,
            )
            extracted_pages.update(extracted_child_pages)

        return extracted_pages, child_pages, child_databases

_async_export_pages(page_ids, database_ids, ids_to_exclude=None, parent_page_ids=None, page_paths=None) async

Export pages and databases to markdown format.

:param page_ids: List of page IDs to export. :param database_ids: List of database IDs to export. :param ids_to_exclude: List of IDs to ignore.

Source code in src/embedding/datasources/notion/exporter.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
async def _async_export_pages(
    self,
    page_ids: set[str],
    database_ids: set[str],
    ids_to_exclude: Optional[set] = None,
    parent_page_ids: Optional[dict] = None,
    page_paths: Optional[dict] = None,
):
    """
    Export pages and databases to markdown format.

    :param page_ids: List of page IDs to export.
    :param database_ids: List of database IDs to export.
    :param ids_to_exclude: List of IDs to ignore.
    """
    if ids_to_exclude is None:
        ids_to_exclude = set()
    if page_paths is None:
        page_paths = {}
    if parent_page_ids is None:
        parent_page_ids = {}

    page_ids -= ids_to_exclude
    database_ids -= ids_to_exclude
    ids_to_exclude.update(page_ids)
    ids_to_exclude.update(database_ids)

    extracted_pages = {}
    child_pages = set()
    child_databases = set()
    if page_ids:
        for page_id in page_ids:
            logging.info(f"Fetching page {page_id}.")
        page_meta_tasks = [
            self._get_page_meta(page_id) for page_id in page_ids
        ]
        page_content_tasks = [
            self._get_block_content(page_id) for page_id in page_ids
        ]
        page_details_results = await asyncio.gather(*page_meta_tasks)
        page_content_results = await asyncio.gather(*page_content_tasks)
        ids_to_exclude.update(
            page["page_id"] for page in page_details_results
        )

        for page_details, (
            markdown,
            child_page_ids,
            child_database_ids,
        ) in zip(page_details_results, page_content_results):
            if (
                self.exclude_title_containing
                and self.exclude_title_containing.lower()
                in page_details.get("title", "").lower()
            ):
                continue
            for child_page_id in child_page_ids:
                parent_page_ids[child_page_id] = page_details["page_id"]
            for child_database_id in child_database_ids:
                parent_page_ids[child_database_id] = page_details["page_id"]
            ## Custom modification ---
            # Remove frontmatter
            extracted_pages[page_details["page_id"]] = {
                "content": "\n".join(markdown),
                "metadata": page_details,
            }
            ## --- Custom modification
            child_pages.update(child_page_ids)
            child_databases.update(child_database_ids)

    if database_ids:
        for database_id in database_ids:
            logging.info(f"Fetching database {database_id}.")
        database_meta_tasks = [
            self._get_database_meta(database_id)
            for database_id in database_ids
        ]
        database_content_tasks = [
            self._get_database_content(database_id)
            for database_id in database_ids
        ]
        database_content_results = await asyncio.gather(
            *database_content_tasks
        )
        database_details_results = await asyncio.gather(
            *database_meta_tasks
        )
        ids_to_exclude.update(
            database["page_id"] for database in database_details_results
        )

        for db_details, (markdown, entry_ids) in zip(
            database_details_results, database_content_results
        ):
            if (
                self.exclude_title_containing
                and self.exclude_title_containing.lower()
                in db_details.get("title", "").lower()
            ):
                continue
            for entry_id in entry_ids:
                parent_page_ids[entry_id] = db_details["page_id"]
            # Custom modification ---
            # Remove frontmatter
            extracted_pages[db_details["page_id"]] = {
                "content": markdown,
                "metadata": db_details,
            }
            # --- Custom modification
            child_pages.update(entry_ids)

    if self.export_child_pages and (child_pages or child_databases):
        extracted_child_pages, _, _ = await self._async_export_pages(
            page_ids=child_pages,
            database_ids=child_databases,
            ids_to_exclude=ids_to_exclude,
            parent_page_ids=parent_page_ids,
            page_paths=page_paths,
        )
        extracted_pages.update(extracted_child_pages)

    return extracted_pages, child_pages, child_databases

_get_database_meta(database_id) async

Retrieve metadata of a database from Notion. Custom modification: - Remove created_by and last_edited_by calls. - Add created_time, type and format.

:param database_id: The ID of the database. :return: A dictionary containing metadata of the database.

Source code in src/embedding/datasources/notion/exporter.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@retry_decorator
async def _get_database_meta(self, database_id: str) -> dict:
    """
    Retrieve metadata of a database from Notion.
    Custom modification:
        - Remove `created_by` and `last_edited_by` calls.
        - Add `created_time`, `type` and `format`.

    :param database_id: The ID of the database.
    :return: A dictionary containing metadata of the database.
    """
    try:
        database_object = await self.notion.databases.retrieve(database_id)
        # Custom modification ---
        # Remove user-related calls
        # --- Custom modification

        database_meta = {
            "title": (
                database_object["title"][0]["plain_text"]
                if database_object["title"]
                else "Untitled"
            ),
            "url": database_object["url"],
            # Custom modification ---
            # Remove user-related calls `created_by` and `last_edited_by`
            "type": "database",
            "created_time": database_object["created_time"],
            "format": "md",
            # --- Custom modification
            "last_edited_time": database_object["last_edited_time"],
            "page_id": database_object["id"],
            "parent_id": database_object["parent"][
                database_object["parent"]["type"]
            ],
        }
    except APIResponseError as exc:
        # Database is not available via API, might be a linked database
        if exc.code in ["object_not_found", "validation_error"]:
            database_meta = {
                "title": "Untitled",
                "url": "",
                # Custom modification ---
                # Remove user-related calls `created_by` and `last_edited_by`
                "type": "database",
                "created_time": "",
                "format": "md",
                # --- Custom modification
                "last_edited_time": "",
                "page_id": database_id,
                "parent_id": "",
            }
        else:
            raise exc

    return database_meta

_get_page_meta(page_id) async

Retrieve metadata of a page from Notion. Custom modification: - Remove created_by and last_edited_by calls. - Add created_time, type and format.

:param page_id: The ID of the page. :return: A dictionary containing metadata of the page.

Source code in src/embedding/datasources/notion/exporter.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@retry_decorator
async def _get_page_meta(self, page_id: str) -> dict:
    """
    Retrieve metadata of a page from Notion.
    Custom modification:
        - Remove `created_by` and `last_edited_by` calls.
        - Add `created_time`, `type` and `format`.

    :param page_id: The ID of the page.
    :return: A dictionary containing metadata of the page.
    """
    page_object = await self.notion.pages.retrieve(page_id)
    # Custom modification ---
    # Remove user-related calls
    # --- Custom modification

    # Database entries don't have an explicit title property, but a title column
    # Also, we extract all properties from the database entry to be able to add them to the markdown page as
    # key-value pairs
    properties = {}
    if page_object["parent"]["type"] == "database_id":
        title = ""
        for prop_name, prop in page_object["properties"].items():
            if prop["type"] == "title":
                title = (
                    prop["title"][0]["plain_text"] if prop["title"] else ""
                )
            properties[prop_name] = (
                self.property_converter.convert_property(prop)
            )
    else:
        try:
            if "Page" in page_object["properties"]:
                title = page_object["properties"]["Page"]["title"][0][
                    "plain_text"
                ]
            elif "title" in page_object["properties"]:
                title = page_object["properties"]["title"]["title"][0][
                    "plain_text"
                ]
        except Exception:
            logging.warning(
                f"Failed to extract title: {traceback.format_exc()}. Using 'None'."
            )
            title = "None"

    page_meta = {
        "title": title,
        "url": page_object["url"],
        # Custom modification ---
        # Remove user-related calls `created_by` and `last_edited_by`
        "created_time": page_object["created_time"],
        "type": "page",
        "format": "md",
        # --- Custom modification
        "last_edited_time": page_object["last_edited_time"],
        "page_id": page_object["id"],
        "parent_id": page_object["parent"][page_object["parent"]["type"]],
    }
    if properties:
        page_meta["properties"] = properties

    return page_meta

async_export_pages(page_ids=None, database_ids=None, ids_to_exclude=None) async

Export pages and databases to markdown files.

:param page_ids: List of page IDs to export. :param database_ids: List of database IDs to export. :param ids_to_exclude: List of IDs to ignore.

Source code in src/embedding/datasources/notion/exporter.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
async def async_export_pages(
    self,
    page_ids: Optional[list[str]] = None,
    database_ids: Optional[list[str]] = None,
    ids_to_exclude: Optional[list[str]] = None,
) -> dict[str, str]:
    """
    Export pages and databases to markdown files.

    :param page_ids: List of page IDs to export.
    :param database_ids: List of database IDs to export.
    :param ids_to_exclude: List of IDs to ignore.
    """
    if page_ids is None and database_ids is None:
        raise ValueError(
            "Either page_ids or database_ids must be specified."
        )

    if ids_to_exclude is None:
        ids_to_exclude = set()
    if page_ids is None:
        page_ids = set()
    if database_ids is None:
        database_ids = set()

    page_ids = set(map(self._normalize_id, page_ids))
    database_ids = set(map(self._normalize_id, database_ids))
    ids_to_exclude = set(map(self._normalize_id, ids_to_exclude))

    page_ids = page_ids - ids_to_exclude
    database_ids = database_ids - ids_to_exclude

    extracted_pages, _, _ = await self._async_export_pages(
        page_ids=page_ids,
        database_ids=database_ids,
        ids_to_exclude=ids_to_exclude,
    )

    return extracted_pages

_PropertyConverter

Bases: PropertyConverter

Source code in src/embedding/datasources/notion/exporter.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class _PropertyConverter(PropertyConverter):

    def __init__(self, notion_exporter: NotionExporterCore):
        super().__init__(notion_exporter)
        self.type_specific_converters["verification"] = self.verification

    def verification(self, property_item: dict) -> str:
        """
        Converts a verification property to a Markdown string.
        """
        return property_item["verification"]["state"]

    def convert_property(self, property_item: dict) -> str:
        """
        Converts a Notion property to a Markdown string.
        """
        try:
            return super().convert_property(property_item)
        except Exception:
            logging.warning(
                f"Failed to convert property: {traceback.format_exc()}. Using 'None'."
            )
            return "None"

convert_property(property_item)

Converts a Notion property to a Markdown string.

Source code in src/embedding/datasources/notion/exporter.py
47
48
49
50
51
52
53
54
55
56
57
def convert_property(self, property_item: dict) -> str:
    """
    Converts a Notion property to a Markdown string.
    """
    try:
        return super().convert_property(property_item)
    except Exception:
        logging.warning(
            f"Failed to convert property: {traceback.format_exc()}. Using 'None'."
        )
        return "None"

verification(property_item)

Converts a verification property to a Markdown string.

Source code in src/embedding/datasources/notion/exporter.py
41
42
43
44
45
def verification(self, property_item: dict) -> str:
    """
    Converts a verification property to a Markdown string.
    """
    return property_item["verification"]["state"]

Manager

NotionDatasourceManager

Bases: DatasourceManager

Manager for Notion content extraction and processing.

Handles document retrieval, cleaning, splitting and embedding updates for Notion workspaces. Implements the base DatasourceManager interface for Notion-specific processing.

Source code in src/embedding/datasources/notion/manager.py
 4
 5
 6
 7
 8
 9
10
11
12
class NotionDatasourceManager(DatasourceManager):
    """Manager for Notion content extraction and processing.

    Handles document retrieval, cleaning, splitting and embedding updates
    for Notion workspaces. Implements the base DatasourceManager
    interface for Notion-specific processing.
    """

    pass

Reader

NotionReader

Bases: BaseReader

Reader for extracting documents from Notion workspace.

Implements document extraction from Notion pages and databases with support for batched async operations and export limits.

Attributes:
  • notion_client

    Client for Notion API interactions

  • export_batch_size

    Number of objects to export concurrently

  • export_limit

    Maximum number of objects to export

  • exporter

    Component for converting Notion content to documents

  • home_page_database_id

    ID of root database containing content index

Source code in src/embedding/datasources/notion/reader.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
class NotionReader(BaseReader):
    """Reader for extracting documents from Notion workspace.

    Implements document extraction from Notion pages and databases with
    support for batched async operations and export limits.

    Attributes:
        notion_client: Client for Notion API interactions
        export_batch_size: Number of objects to export concurrently
        export_limit: Maximum number of objects to export
        exporter: Component for converting Notion content to documents
        home_page_database_id: ID of root database containing content index
    """

    def __init__(
        self,
        configuration: NotionDatasourceConfiguration,
        notion_client: Client,
        exporter: NotionExporter,
    ):
        """Initialize Notion reader.

        Args:
            configuration: Settings for Notion access and limits
            notion_client: Client for Notion API interaction
            exporter: Component for content export and conversion
        """
        super().__init__()
        self.notion_client = notion_client
        self.export_batch_size = configuration.export_batch_size
        self.export_limit = configuration.export_limit
        self.exporter = exporter
        self.home_page_database_id = configuration.home_page_database_id

    def get_all_documents(self) -> List[NotionDocument]:
        """
        Synchronous implementation for fetching all documents from the data source.
        """
        pass

    async def get_all_documents_async(self) -> List[NotionDocument]:
        """Asynchronously retrieve all documents from Notion.

        Fetches pages and databases in batches, respecting export limits
        and batch sizes.

        Returns:
            List[NotionDocument]: Collection of processed documents
        """
        if self.home_page_database_id is None:
            database_ids = []
            page_ids = []
        else:
            database_ids, page_ids = self._get_ids_from_home_page()

        database_ids.extend(
            self._get_all_ids(
                NotionObjectType.DATABASE,
                limit=self._get_current_limit(database_ids, page_ids),
            )
        )
        page_ids.extend(
            self._get_all_ids(
                NotionObjectType.PAGE,
                limit=self._get_current_limit(database_ids, page_ids),
            )
        )

        # Process IDs
        database_ids = set(database_ids)
        database_ids.discard(self.home_page_database_id)
        page_ids = set(page_ids)

        # Batch and export
        chunked_database_ids = list(
            chunked(database_ids, self.export_batch_size)
        )
        chunked_page_ids = list(chunked(page_ids, self.export_batch_size))

        database_documents, database_failed = await self._export_documents(
            chunked_database_ids, NotionObjectType.DATABASE
        )
        page_documents, page_failed = await self._export_documents(
            chunked_page_ids, NotionObjectType.PAGE
        )

        # Log failures
        if database_failed:
            logging.warning(
                f"Failed to export {len(database_failed)} databases: {database_failed}"
            )
        if page_failed:
            logging.warning(
                f"Failed to export {len(page_failed)} pages: {page_failed}"
            )

        # Apply limit if needed
        documents = database_documents + page_documents
        return (
            documents
            if self.export_limit is None
            else documents[: self.export_limit]
        )

    async def _export_documents(
        self, chunked_ids: List[List[str]], objects_type: NotionObjectType
    ) -> Tuple[List[NotionDocument], List[str]]:
        """Export documents in batches.

        Args:
            chunked_ids: Batched lists of object IDs
            objects_type: Type of Notion objects to export

        Returns:
            Tuple containing:
                - List of exported documents
                - List of failed export IDs

        Raises:
            ValueError: If unsupported object type provided
        """
        all_documents = []
        failed_exports = []
        num_chunks = len(chunked_ids)

        for i, chunk_ids in enumerate(chunked_ids):
            logging.info(
                f"[{i}/{num_chunks}] Exporting {objects_type.name} objects: {chunk_ids}"
            )

            try:
                documents = await self.exporter.run(
                    page_ids=(
                        chunk_ids
                        if objects_type == NotionObjectType.PAGE
                        else None
                    ),
                    database_ids=(
                        chunk_ids
                        if objects_type == NotionObjectType.DATABASE
                        else None
                    ),
                )
                all_documents.extend(documents)
                logging.info(
                    f"[{i}/{num_chunks}] Added {len(documents)} documents"
                )
            except Exception as e:
                logging.error(
                    f"[{i}/{num_chunks}] Export failed for {objects_type.name}: {chunk_ids}. {e}"
                )
                failed_exports.extend(chunk_ids)

        return all_documents, failed_exports

    def _get_ids_from_home_page(self) -> Tuple[List[str], List[str]]:
        """Extract database and page IDs from home page database.

        Queries the configured home page database and extracts IDs for
        both databases and pages.

        Returns:
            Tuple containing:
                - List of database IDs
                - List of page IDs
        """
        logging.info(
            f"Fetching all object ids from Notion's home page with limit {self.export_limit}..."
        )
        response = self._collect_paginated_api(
            function=self.notion_client.databases.query,
            limit=self.export_limit,
            database_id=self.home_page_database_id,
        )
        database_ids = [
            entry["id"] for entry in response if entry["object"] == "database"
        ]
        page_ids = [
            entry["id"] for entry in response if entry["object"] == "page"
        ]

        logging.info(
            f"Found {len(database_ids)} database ids and {len(page_ids)} page ids in Notion."
        )

        return database_ids, page_ids

    def _get_all_ids(
        self, objects_type: NotionObjectType, limit: int = None
    ) -> List[str]:
        """Fetch all IDs for specified Notion object type.

        Args:
            objects_type: Type of Notion objects to fetch
            limit: Maximum number of IDs to fetch (None for unlimited)

        Returns:
            List[str]: Collection of object IDs

        Note:
            Returns empty list if limit is 0 or negative
        """
        if limit is not None and limit <= 0:
            return []

        logging.info(
            f"Fetching all ids of {objects_type.name} objects from Notion with limit {limit}..."
        )

        params = {
            "filter": {
                "value": objects_type.name.lower(),
                "property": "object",
            },
        }
        results = NotionReader._collect_paginated_api(
            self.notion_client.search, limit, **params
        )
        object_ids = [object["id"] for object in results]
        object_ids = object_ids[:limit] if limit is not None else object_ids

        logging.info(
            f"Found {len(object_ids)} ids of {objects_type.name} objects in Notion."
        )

        return object_ids

    def _get_current_limit(
        self, database_ids: List[str], page_ids: List[str]
    ) -> int:
        """Calculate remaining object limit based on existing IDs.

        Args:
            database_ids: Currently collected database IDs
            page_ids: Currently collected page IDs

        Returns:
            int: Remaining limit (None if no limit configured)

        Note:
            Subtracts total of existing IDs from configured export limit
        """
        return (
            self.export_limit - len(database_ids) - len(page_ids)
            if self.export_limit
            else None
        )

    @staticmethod
    def _collect_paginated_api(
        function: Callable[..., Any], limit: int, **kwargs: Any
    ) -> List[Any]:
        """Collect all results from paginated Notion API endpoint.

        Args:
            function: API function to call
            limit: Maximum number of results to collect
            **kwargs: Additional arguments for API function

        Returns:
            List[Any]: Collected API results
        """
        next_cursor = kwargs.pop("start_cursor", None)
        result = []

        while True:
            response = function(**kwargs, start_cursor=next_cursor)
            result.extend(response.get("results"))

            if NotionReader._limit_reached(result, limit):
                return result[:limit]
            if not NotionReader._has_more_pages(response):
                return result[:limit] if limit else result

            next_cursor = response.get("next_cursor")

    @staticmethod
    def _limit_reached(result: List[dict], limit: int) -> bool:
        """Check if result count has reached limit.

        Args:
            result: Current results
            limit: Maximum allowed results

        Returns:
            bool: True if limit reached
        """
        return limit is not None and len(result) >= limit

    @staticmethod
    def _has_more_pages(response: dict) -> bool:
        """Check if more pages are available.

        Args:
            response: API response dictionary

        Returns:
            bool: True if more pages available
        """
        return response.get("has_more") and response.get("next_cursor")

__init__(configuration, notion_client, exporter)

Initialize Notion reader.

Parameters:
  • configuration (NotionDatasourceConfiguration) –

    Settings for Notion access and limits

  • notion_client (Client) –

    Client for Notion API interaction

  • exporter (NotionExporter) –

    Component for content export and conversion

Source code in src/embedding/datasources/notion/reader.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    configuration: NotionDatasourceConfiguration,
    notion_client: Client,
    exporter: NotionExporter,
):
    """Initialize Notion reader.

    Args:
        configuration: Settings for Notion access and limits
        notion_client: Client for Notion API interaction
        exporter: Component for content export and conversion
    """
    super().__init__()
    self.notion_client = notion_client
    self.export_batch_size = configuration.export_batch_size
    self.export_limit = configuration.export_limit
    self.exporter = exporter
    self.home_page_database_id = configuration.home_page_database_id

_collect_paginated_api(function, limit, **kwargs) staticmethod

Collect all results from paginated Notion API endpoint.

Parameters:
  • function (Callable[..., Any]) –

    API function to call

  • limit (int) –

    Maximum number of results to collect

  • **kwargs (Any, default: {} ) –

    Additional arguments for API function

Returns:
  • List[Any]

    List[Any]: Collected API results

Source code in src/embedding/datasources/notion/reader.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
@staticmethod
def _collect_paginated_api(
    function: Callable[..., Any], limit: int, **kwargs: Any
) -> List[Any]:
    """Collect all results from paginated Notion API endpoint.

    Args:
        function: API function to call
        limit: Maximum number of results to collect
        **kwargs: Additional arguments for API function

    Returns:
        List[Any]: Collected API results
    """
    next_cursor = kwargs.pop("start_cursor", None)
    result = []

    while True:
        response = function(**kwargs, start_cursor=next_cursor)
        result.extend(response.get("results"))

        if NotionReader._limit_reached(result, limit):
            return result[:limit]
        if not NotionReader._has_more_pages(response):
            return result[:limit] if limit else result

        next_cursor = response.get("next_cursor")

_export_documents(chunked_ids, objects_type) async

Export documents in batches.

Parameters:
  • chunked_ids (List[List[str]]) –

    Batched lists of object IDs

  • objects_type (NotionObjectType) –

    Type of Notion objects to export

Returns:
  • Tuple[List[NotionDocument], List[str]]

    Tuple containing: - List of exported documents - List of failed export IDs

Raises:
  • ValueError

    If unsupported object type provided

Source code in src/embedding/datasources/notion/reader.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
async def _export_documents(
    self, chunked_ids: List[List[str]], objects_type: NotionObjectType
) -> Tuple[List[NotionDocument], List[str]]:
    """Export documents in batches.

    Args:
        chunked_ids: Batched lists of object IDs
        objects_type: Type of Notion objects to export

    Returns:
        Tuple containing:
            - List of exported documents
            - List of failed export IDs

    Raises:
        ValueError: If unsupported object type provided
    """
    all_documents = []
    failed_exports = []
    num_chunks = len(chunked_ids)

    for i, chunk_ids in enumerate(chunked_ids):
        logging.info(
            f"[{i}/{num_chunks}] Exporting {objects_type.name} objects: {chunk_ids}"
        )

        try:
            documents = await self.exporter.run(
                page_ids=(
                    chunk_ids
                    if objects_type == NotionObjectType.PAGE
                    else None
                ),
                database_ids=(
                    chunk_ids
                    if objects_type == NotionObjectType.DATABASE
                    else None
                ),
            )
            all_documents.extend(documents)
            logging.info(
                f"[{i}/{num_chunks}] Added {len(documents)} documents"
            )
        except Exception as e:
            logging.error(
                f"[{i}/{num_chunks}] Export failed for {objects_type.name}: {chunk_ids}. {e}"
            )
            failed_exports.extend(chunk_ids)

    return all_documents, failed_exports

_get_all_ids(objects_type, limit=None)

Fetch all IDs for specified Notion object type.

Parameters:
  • objects_type (NotionObjectType) –

    Type of Notion objects to fetch

  • limit (int, default: None ) –

    Maximum number of IDs to fetch (None for unlimited)

Returns:
  • List[str]

    List[str]: Collection of object IDs

Note

Returns empty list if limit is 0 or negative

Source code in src/embedding/datasources/notion/reader.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def _get_all_ids(
    self, objects_type: NotionObjectType, limit: int = None
) -> List[str]:
    """Fetch all IDs for specified Notion object type.

    Args:
        objects_type: Type of Notion objects to fetch
        limit: Maximum number of IDs to fetch (None for unlimited)

    Returns:
        List[str]: Collection of object IDs

    Note:
        Returns empty list if limit is 0 or negative
    """
    if limit is not None and limit <= 0:
        return []

    logging.info(
        f"Fetching all ids of {objects_type.name} objects from Notion with limit {limit}..."
    )

    params = {
        "filter": {
            "value": objects_type.name.lower(),
            "property": "object",
        },
    }
    results = NotionReader._collect_paginated_api(
        self.notion_client.search, limit, **params
    )
    object_ids = [object["id"] for object in results]
    object_ids = object_ids[:limit] if limit is not None else object_ids

    logging.info(
        f"Found {len(object_ids)} ids of {objects_type.name} objects in Notion."
    )

    return object_ids

_get_current_limit(database_ids, page_ids)

Calculate remaining object limit based on existing IDs.

Parameters:
  • database_ids (List[str]) –

    Currently collected database IDs

  • page_ids (List[str]) –

    Currently collected page IDs

Returns:
  • int( int ) –

    Remaining limit (None if no limit configured)

Note

Subtracts total of existing IDs from configured export limit

Source code in src/embedding/datasources/notion/reader.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def _get_current_limit(
    self, database_ids: List[str], page_ids: List[str]
) -> int:
    """Calculate remaining object limit based on existing IDs.

    Args:
        database_ids: Currently collected database IDs
        page_ids: Currently collected page IDs

    Returns:
        int: Remaining limit (None if no limit configured)

    Note:
        Subtracts total of existing IDs from configured export limit
    """
    return (
        self.export_limit - len(database_ids) - len(page_ids)
        if self.export_limit
        else None
    )

_get_ids_from_home_page()

Extract database and page IDs from home page database.

Queries the configured home page database and extracts IDs for both databases and pages.

Returns:
  • Tuple[List[str], List[str]]

    Tuple containing: - List of database IDs - List of page IDs

Source code in src/embedding/datasources/notion/reader.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def _get_ids_from_home_page(self) -> Tuple[List[str], List[str]]:
    """Extract database and page IDs from home page database.

    Queries the configured home page database and extracts IDs for
    both databases and pages.

    Returns:
        Tuple containing:
            - List of database IDs
            - List of page IDs
    """
    logging.info(
        f"Fetching all object ids from Notion's home page with limit {self.export_limit}..."
    )
    response = self._collect_paginated_api(
        function=self.notion_client.databases.query,
        limit=self.export_limit,
        database_id=self.home_page_database_id,
    )
    database_ids = [
        entry["id"] for entry in response if entry["object"] == "database"
    ]
    page_ids = [
        entry["id"] for entry in response if entry["object"] == "page"
    ]

    logging.info(
        f"Found {len(database_ids)} database ids and {len(page_ids)} page ids in Notion."
    )

    return database_ids, page_ids

_has_more_pages(response) staticmethod

Check if more pages are available.

Parameters:
  • response (dict) –

    API response dictionary

Returns:
  • bool( bool ) –

    True if more pages available

Source code in src/embedding/datasources/notion/reader.py
310
311
312
313
314
315
316
317
318
319
320
@staticmethod
def _has_more_pages(response: dict) -> bool:
    """Check if more pages are available.

    Args:
        response: API response dictionary

    Returns:
        bool: True if more pages available
    """
    return response.get("has_more") and response.get("next_cursor")

_limit_reached(result, limit) staticmethod

Check if result count has reached limit.

Parameters:
  • result (List[dict]) –

    Current results

  • limit (int) –

    Maximum allowed results

Returns:
  • bool( bool ) –

    True if limit reached

Source code in src/embedding/datasources/notion/reader.py
297
298
299
300
301
302
303
304
305
306
307
308
@staticmethod
def _limit_reached(result: List[dict], limit: int) -> bool:
    """Check if result count has reached limit.

    Args:
        result: Current results
        limit: Maximum allowed results

    Returns:
        bool: True if limit reached
    """
    return limit is not None and len(result) >= limit

get_all_documents()

Synchronous implementation for fetching all documents from the data source.

Source code in src/embedding/datasources/notion/reader.py
55
56
57
58
59
def get_all_documents(self) -> List[NotionDocument]:
    """
    Synchronous implementation for fetching all documents from the data source.
    """
    pass

get_all_documents_async() async

Asynchronously retrieve all documents from Notion.

Fetches pages and databases in batches, respecting export limits and batch sizes.

Returns:
  • List[NotionDocument]

    List[NotionDocument]: Collection of processed documents

Source code in src/embedding/datasources/notion/reader.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def get_all_documents_async(self) -> List[NotionDocument]:
    """Asynchronously retrieve all documents from Notion.

    Fetches pages and databases in batches, respecting export limits
    and batch sizes.

    Returns:
        List[NotionDocument]: Collection of processed documents
    """
    if self.home_page_database_id is None:
        database_ids = []
        page_ids = []
    else:
        database_ids, page_ids = self._get_ids_from_home_page()

    database_ids.extend(
        self._get_all_ids(
            NotionObjectType.DATABASE,
            limit=self._get_current_limit(database_ids, page_ids),
        )
    )
    page_ids.extend(
        self._get_all_ids(
            NotionObjectType.PAGE,
            limit=self._get_current_limit(database_ids, page_ids),
        )
    )

    # Process IDs
    database_ids = set(database_ids)
    database_ids.discard(self.home_page_database_id)
    page_ids = set(page_ids)

    # Batch and export
    chunked_database_ids = list(
        chunked(database_ids, self.export_batch_size)
    )
    chunked_page_ids = list(chunked(page_ids, self.export_batch_size))

    database_documents, database_failed = await self._export_documents(
        chunked_database_ids, NotionObjectType.DATABASE
    )
    page_documents, page_failed = await self._export_documents(
        chunked_page_ids, NotionObjectType.PAGE
    )

    # Log failures
    if database_failed:
        logging.warning(
            f"Failed to export {len(database_failed)} databases: {database_failed}"
        )
    if page_failed:
        logging.warning(
            f"Failed to export {len(page_failed)} pages: {page_failed}"
        )

    # Apply limit if needed
    documents = database_documents + page_documents
    return (
        documents
        if self.export_limit is None
        else documents[: self.export_limit]
    )

Splitter

NotionSplitter

Bases: BaseSplitter

Splitter for Notion content with separate database and page handling.

Implements content splitting for Notion documents by routing databases and pages to specialized splitters.

Attributes:
  • database_splitter

    Splitter configured for database content

  • page_splitter

    Splitter configured for page content

Source code in src/embedding/datasources/notion/splitter.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class NotionSplitter(BaseSplitter):
    """Splitter for Notion content with separate database and page handling.

    Implements content splitting for Notion documents by routing databases
    and pages to specialized splitters.

    Attributes:
        database_splitter: Splitter configured for database content
        page_splitter: Splitter configured for page content
    """

    def __init__(
        self,
        database_splitter: MarkdownSplitter,
        page_splitter: MarkdownSplitter,
    ):
        """Initialize Notion content splitter.

        Args:
            database_splitter: MarkdownSplitter instance for databases
            page_splitter: MarkdownSplitter instance for pages
        """
        self.database_splitter = database_splitter
        self.page_splitter = page_splitter

    def split(self, documents: List[NotionDocument]) -> List[TextNode]:
        """Split Notion documents into text nodes.

        Separates documents by type and processes them with appropriate
        splitter.

        Args:
            documents: Collection of Notion documents to split

        Returns:
            List[TextNode]: Combined collection of text nodes from all documents
        """
        database_documents = [
            doc
            for doc in documents
            if doc.extra_info["type"] == NotionObjectType.DATABASE.value
        ]
        page_documents = [
            doc
            for doc in documents
            if doc.extra_info["type"] == NotionObjectType.PAGE.value
        ]

        nodes = self.database_splitter.split(database_documents)
        nodes.extend(self.page_splitter.split(page_documents))

        return nodes

__init__(database_splitter, page_splitter)

Initialize Notion content splitter.

Parameters:
  • database_splitter (MarkdownSplitter) –

    MarkdownSplitter instance for databases

  • page_splitter (MarkdownSplitter) –

    MarkdownSplitter instance for pages

Source code in src/embedding/datasources/notion/splitter.py
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(
    self,
    database_splitter: MarkdownSplitter,
    page_splitter: MarkdownSplitter,
):
    """Initialize Notion content splitter.

    Args:
        database_splitter: MarkdownSplitter instance for databases
        page_splitter: MarkdownSplitter instance for pages
    """
    self.database_splitter = database_splitter
    self.page_splitter = page_splitter

split(documents)

Split Notion documents into text nodes.

Separates documents by type and processes them with appropriate splitter.

Parameters:
  • documents (List[NotionDocument]) –

    Collection of Notion documents to split

Returns:
  • List[TextNode]

    List[TextNode]: Combined collection of text nodes from all documents

Source code in src/embedding/datasources/notion/splitter.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def split(self, documents: List[NotionDocument]) -> List[TextNode]:
    """Split Notion documents into text nodes.

    Separates documents by type and processes them with appropriate
    splitter.

    Args:
        documents: Collection of Notion documents to split

    Returns:
        List[TextNode]: Combined collection of text nodes from all documents
    """
    database_documents = [
        doc
        for doc in documents
        if doc.extra_info["type"] == NotionObjectType.DATABASE.value
    ]
    page_documents = [
        doc
        for doc in documents
        if doc.extra_info["type"] == NotionObjectType.PAGE.value
    ]

    nodes = self.database_splitter.split(database_documents)
    nodes.extend(self.page_splitter.split(page_documents))

    return nodes

Builders

NotionCleanerBuilder

Builder for creating Notion content cleaner instances.

Provides factory method to create NotionCleaner objects.

Source code in src/embedding/datasources/notion/builders.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class NotionCleanerBuilder:
    """Builder for creating Notion content cleaner instances.

    Provides factory method to create NotionCleaner objects.
    """

    @staticmethod
    @inject
    def build() -> NotionCleaner:
        """Creates a content cleaner for Notion.

        Returns:
            NotionCleaner: Configured cleaner instance
        """
        return NotionCleaner()

build() staticmethod

Creates a content cleaner for Notion.

Returns:
  • NotionCleaner( NotionCleaner ) –

    Configured cleaner instance

Source code in src/embedding/datasources/notion/builders.py
129
130
131
132
133
134
135
136
137
@staticmethod
@inject
def build() -> NotionCleaner:
    """Creates a content cleaner for Notion.

    Returns:
        NotionCleaner: Configured cleaner instance
    """
    return NotionCleaner()

NotionClientBuilder

Builder for creating Notion API client instances.

Provides factory method to create configured Notion API clients.

Source code in src/embedding/datasources/notion/builders.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class NotionClientBuilder:
    """Builder for creating Notion API client instances.

    Provides factory method to create configured Notion API clients.
    """

    @staticmethod
    @inject
    def build(configuration: NotionDatasourceConfiguration) -> Client:
        """Creates a configured Notion API client.

        Args:
            configuration: Notion authentication settings

        Returns:
            Client: Configured API client instance
        """
        return Client(auth=configuration.secrets.api_token.get_secret_value())

build(configuration) staticmethod

Creates a configured Notion API client.

Parameters:
  • configuration (NotionDatasourceConfiguration) –

    Notion authentication settings

Returns:
  • Client( Client ) –

    Configured API client instance

Source code in src/embedding/datasources/notion/builders.py
87
88
89
90
91
92
93
94
95
96
97
98
@staticmethod
@inject
def build(configuration: NotionDatasourceConfiguration) -> Client:
    """Creates a configured Notion API client.

    Args:
        configuration: Notion authentication settings

    Returns:
        Client: Configured API client instance
    """
    return Client(auth=configuration.secrets.api_token.get_secret_value())

NotionDatasourceManagerBuilder

Builder for creating Notion datasource manager instances.

Provides factory method to create configured NotionDatasourceManager with required components for content processing.

Source code in src/embedding/datasources/notion/builders.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class NotionDatasourceManagerBuilder:
    """Builder for creating Notion datasource manager instances.

    Provides factory method to create configured NotionDatasourceManager
    with required components for content processing.
    """

    @staticmethod
    @inject
    def build(
        configuration: NotionDatasourceConfiguration,
        reader: NotionReader,
        cleaner: NotionCleaner,
        splitter: NotionSplitter,
    ) -> NotionDatasourceManager:
        """Creates a configured Notion datasource manager.

        Args:
            configuration: Notion access and processing settings
            reader: Component for reading Notion content
            cleaner: Component for cleaning raw content
            splitter: Component for splitting content into chunks

        Returns:
            NotionDatasourceManager: Configured manager instance
        """
        return NotionDatasourceManager(
            configuration=configuration,
            reader=reader,
            cleaner=cleaner,
            splitter=splitter,
        )

build(configuration, reader, cleaner, splitter) staticmethod

Creates a configured Notion datasource manager.

Parameters:
  • configuration (NotionDatasourceConfiguration) –

    Notion access and processing settings

  • reader (NotionReader) –

    Component for reading Notion content

  • cleaner (NotionCleaner) –

    Component for cleaning raw content

  • splitter (NotionSplitter) –

    Component for splitting content into chunks

Returns:
  • NotionDatasourceManager( NotionDatasourceManager ) –

    Configured manager instance

Source code in src/embedding/datasources/notion/builders.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@staticmethod
@inject
def build(
    configuration: NotionDatasourceConfiguration,
    reader: NotionReader,
    cleaner: NotionCleaner,
    splitter: NotionSplitter,
) -> NotionDatasourceManager:
    """Creates a configured Notion datasource manager.

    Args:
        configuration: Notion access and processing settings
        reader: Component for reading Notion content
        cleaner: Component for cleaning raw content
        splitter: Component for splitting content into chunks

    Returns:
        NotionDatasourceManager: Configured manager instance
    """
    return NotionDatasourceManager(
        configuration=configuration,
        reader=reader,
        cleaner=cleaner,
        splitter=splitter,
    )

NotionExporterBuilder

Builder for creating Notion content exporter instances.

Provides factory method to create configured NotionExporter objects.

Source code in src/embedding/datasources/notion/builders.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class NotionExporterBuilder:
    """Builder for creating Notion content exporter instances.

    Provides factory method to create configured NotionExporter objects.
    """

    @staticmethod
    @inject
    def build(configuration: NotionDatasourceConfiguration) -> NotionExporter:
        """Creates a configured Notion content exporter.

        Args:
            configuration: Notion authentication settings

        Returns:
            NotionExporter: Configured exporter instance
        """
        return NotionExporter(
            api_token=configuration.secrets.api_token.get_secret_value()
        )

build(configuration) staticmethod

Creates a configured Notion content exporter.

Parameters:
  • configuration (NotionDatasourceConfiguration) –

    Notion authentication settings

Returns:
  • NotionExporter( NotionExporter ) –

    Configured exporter instance

Source code in src/embedding/datasources/notion/builders.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@staticmethod
@inject
def build(configuration: NotionDatasourceConfiguration) -> NotionExporter:
    """Creates a configured Notion content exporter.

    Args:
        configuration: Notion authentication settings

    Returns:
        NotionExporter: Configured exporter instance
    """
    return NotionExporter(
        api_token=configuration.secrets.api_token.get_secret_value()
    )

NotionReaderBuilder

Builder for creating Notion reader instances.

Provides factory method to create configured NotionReader objects.

Source code in src/embedding/datasources/notion/builders.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class NotionReaderBuilder:
    """Builder for creating Notion reader instances.

    Provides factory method to create configured NotionReader objects.
    """

    @staticmethod
    @inject
    def build(
        configuration: NotionDatasourceConfiguration,
        notion_client: Client,
        exporter: NotionExporter,
    ) -> NotionReader:
        """Creates a configured Notion reader.

        Args:
            configuration: Notion access settings
            notion_client: Client for Notion API interaction
            exporter: Component for content export

        Returns:
            NotionReader: Configured reader instance
        """
        return NotionReader(
            configuration=configuration,
            notion_client=notion_client,
            exporter=exporter,
        )

build(configuration, notion_client, exporter) staticmethod

Creates a configured Notion reader.

Parameters:
  • configuration (NotionDatasourceConfiguration) –

    Notion access settings

  • notion_client (Client) –

    Client for Notion API interaction

  • exporter (NotionExporter) –

    Component for content export

Returns:
  • NotionReader( NotionReader ) –

    Configured reader instance

Source code in src/embedding/datasources/notion/builders.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@staticmethod
@inject
def build(
    configuration: NotionDatasourceConfiguration,
    notion_client: Client,
    exporter: NotionExporter,
) -> NotionReader:
    """Creates a configured Notion reader.

    Args:
        configuration: Notion access settings
        notion_client: Client for Notion API interaction
        exporter: Component for content export

    Returns:
        NotionReader: Configured reader instance
    """
    return NotionReader(
        configuration=configuration,
        notion_client=notion_client,
        exporter=exporter,
    )

NotionSplitterBuilder

Builder for creating Notion content splitter instances.

Provides factory method to create configured NotionSplitter objects with separate splitters for databases and pages.

Source code in src/embedding/datasources/notion/builders.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class NotionSplitterBuilder:
    """Builder for creating Notion content splitter instances.

    Provides factory method to create configured NotionSplitter objects
    with separate splitters for databases and pages.
    """

    @staticmethod
    @inject
    def build(
        database_splitter: BoundEmbeddingModelMarkdownSplitter,
        page_splitter: BoundEmbeddingModelMarkdownSplitter,
    ) -> NotionSplitter:
        """Creates a configured Notion content splitter.

        Args:
            database_splitter: Splitter for database content
            page_splitter: Splitter for page content

        Returns:
            NotionSplitter: Configured splitter instance
        """
        return NotionSplitter(
            database_splitter=database_splitter, page_splitter=page_splitter
        )

build(database_splitter, page_splitter) staticmethod

Creates a configured Notion content splitter.

Parameters:
  • database_splitter (BoundEmbeddingModelMarkdownSplitter) –

    Splitter for database content

  • page_splitter (BoundEmbeddingModelMarkdownSplitter) –

    Splitter for page content

Returns:
  • NotionSplitter( NotionSplitter ) –

    Configured splitter instance

Source code in src/embedding/datasources/notion/builders.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@staticmethod
@inject
def build(
    database_splitter: BoundEmbeddingModelMarkdownSplitter,
    page_splitter: BoundEmbeddingModelMarkdownSplitter,
) -> NotionSplitter:
    """Creates a configured Notion content splitter.

    Args:
        database_splitter: Splitter for database content
        page_splitter: Splitter for page content

    Returns:
        NotionSplitter: Configured splitter instance
    """
    return NotionSplitter(
        database_splitter=database_splitter, page_splitter=page_splitter
    )