Notion Datasource

This module contains functionality related to the Notion datasource.

Cleaner

NotionDatasourceCleaner

Bases: BasicMarkdownCleaner[NotionDocument]

Cleaner for Notion document content.

Implements cleaning logic for Notion databases and pages, removing HTML tags and comments while preserving meaningful content.

Note

Expects documents to be in markdown format.

Source code in src/extraction/datasources/notion/cleaner.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class NotionDatasourceCleaner(BasicMarkdownCleaner[NotionDocument]):
    """Cleaner for Notion document content.

    Implements cleaning logic for Notion databases and pages, removing HTML
    tags and comments while preserving meaningful content.

    Note:
        Expects documents to be in markdown format.
    """

    def clean(self, document: NotionDocument) -> NotionDocument:
        """Clean a single Notion document.

        Processes the document based on its type (database or page),
        removing HTML artifacts and cleaning the content.

        Args:
            document: Notion document to clean

        Returns:
            NotionDocument: Cleaned document, or None if content is empty after cleaning
        """
        if document.metadata["type"] == "database":
            cleaned_text = self._clean_database(document)
            document.set_content(cleaned_text)
        if document.metadata["type"] == "page":
            cleaned_text = self._clean_page(document)
            document.set_content(cleaned_text)

        if self._has_empty_content(document):
            return None

        return document

    def _clean_database(self, document: NotionDocument) -> str:
        """Clean Notion database content.

        Extracts and cleans the text content from a Notion database document,
        processing any embedded HTML elements.

        Args:
            document: Database document to clean

        Returns:
            str: Cleaned database content as markdown text
        """
        return NotionDatasourceCleaner._parse_html_in_markdown(document.text)

    def _clean_page(self, document: NotionDocument) -> str:
        """Clean Notion page content.

        Extracts and cleans the text content from a Notion page document,
        processing any embedded HTML elements.

        Args:
            document: Page document to clean

        Returns:
            str: Cleaned page content as markdown text
        """
        return NotionDatasourceCleaner._parse_html_in_markdown(document.text)

    @staticmethod
    def _parse_html_in_markdown(md_text: str) -> str:
        """Process HTML elements within markdown content.

        Performs two main cleaning operations:
        1. Removes HTML comments completely
        2. Converts HTML tags to markdown format
        3. Removes elements that don't contain alphanumeric characters

        Args:
            md_text: Text containing markdown and HTML

        Returns:
            str: Cleaned markdown text with HTML properly converted or removed

        Note:
            Uses BeautifulSoup for HTML parsing and markdownify for HTML-to-markdown conversion
        """

        def replace_html(match):
            html_content = match.group(0)
            soup = BeautifulSoup(html_content, "html.parser")
            markdown = md(str(soup))

            if not re.search(r"[a-zA-Z0-9]", markdown):
                return ""
            return markdown

        md_text = re.sub(r"<!--.*?-->", "", md_text, flags=re.DOTALL)
        html_block_re = re.compile(r"<.*?>", re.DOTALL)
        return re.sub(html_block_re, replace_html, md_text)

clean(document)

Clean a single Notion document.

Processes the document based on its type (database or page), removing HTML artifacts and cleaning the content.

Parameters:
  • document (NotionDocument) –

    Notion document to clean

Returns:
  • NotionDocument( NotionDocument ) –

    Cleaned document, or None if content is empty after cleaning

Source code in src/extraction/datasources/notion/cleaner.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def clean(self, document: NotionDocument) -> NotionDocument:
    """Clean a single Notion document.

    Processes the document based on its type (database or page),
    removing HTML artifacts and cleaning the content.

    Args:
        document: Notion document to clean

    Returns:
        NotionDocument: Cleaned document, or None if content is empty after cleaning
    """
    if document.metadata["type"] == "database":
        cleaned_text = self._clean_database(document)
        document.set_content(cleaned_text)
    if document.metadata["type"] == "page":
        cleaned_text = self._clean_page(document)
        document.set_content(cleaned_text)

    if self._has_empty_content(document):
        return None

    return document

NotionDatasourceCleanerFactory

Bases: Factory

Factory for creating NotionDatasourceCleaner instances.

This factory is responsible for creating instances of NotionDatasourceCleaner with the appropriate configuration.

Source code in src/extraction/datasources/notion/cleaner.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class NotionDatasourceCleanerFactory(Factory):
    """Factory for creating NotionDatasourceCleaner instances.

    This factory is responsible for creating instances of NotionDatasourceCleaner
    with the appropriate configuration.
    """

    _configuration_class = NotionDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, _: NotionDatasourceConfiguration
    ) -> NotionDatasourceCleaner:
        """Create a new instance of NotionDatasourceCleaner.

        Args:
            configuration: Configuration for the cleaner

        Returns:
            NotionDatasourceCleaner: Instance of NotionDatasourceCleaner
        """
        return NotionDatasourceCleaner()

Client

NotionClientFactory

Bases: SingletonFactory

Factory for creating and managing Notion API client instances.

This singleton factory ensures that only one Notion client instance is created for a specific configuration, promoting resource efficiency and consistency. Client instances are created using the Notion API authentication token from the provided configuration.

The factory follows the singleton pattern to prevent multiple instantiations of clients with identical configurations.

Attributes:
  • _configuration_class (Type) –

    Type of configuration object used to create the client

Source code in src/extraction/datasources/notion/client.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class NotionClientFactory(SingletonFactory):
    """Factory for creating and managing Notion API client instances.

    This singleton factory ensures that only one Notion client instance is created
    for a specific configuration, promoting resource efficiency and consistency.
    Client instances are created using the Notion API authentication token
    from the provided configuration.

    The factory follows the singleton pattern to prevent multiple instantiations
    of clients with identical configurations.

    Attributes:
        _configuration_class: Type of configuration object used to create the client
    """

    _configuration_class: Type = NotionDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: NotionDatasourceConfiguration
    ) -> Client:
        """Create a new instance of the Notion API client.

        This method extracts the API token from the provided configuration's
        secrets and uses it to authenticate a new Notion client.

        Args:
            configuration: Configuration object containing Notion API credentials
                           and other settings.

        Returns:
            A configured Notion API client instance ready for making API calls.
        """
        return Client(auth=configuration.secrets.api_token.get_secret_value())

Configuration

Document

NotionDocument

Bases: BaseDocument

Document representation for Notion page content.

Extends BaseDocument to handle Notion-specific document processing including metadata handling and filtering for embeddings and LLM contexts.

Source code in src/extraction/datasources/notion/document.py
 4
 5
 6
 7
 8
 9
10
11
class NotionDocument(BaseDocument):
    """Document representation for Notion page content.

    Extends BaseDocument to handle Notion-specific document processing including
    metadata handling and filtering for embeddings and LLM contexts.
    """

    pass

Exporter

NotionExporter

Exporter for converting Notion pages to markdown documents.

Provides a high-level interface for extracting Notion content and converting it to structured NotionDocument instances.

Source code in src/extraction/datasources/notion/exporter.py
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
class NotionExporter:
    """Exporter for converting Notion pages to markdown documents.

    Provides a high-level interface for extracting Notion content
    and converting it to structured NotionDocument instances.
    """

    def __init__(
        self,
        api_token: str,
    ):
        """Initialize Notion exporter.

        Args:
            api_token: Authentication token for Notion API
        """
        self.notion_exporter = _NotionExporterCore(
            notion_token=api_token,
            export_child_pages=False,
            extract_page_metadata=True,
        )

    async def run(
        self, page_ids: List[str] = None, database_ids: List[str] = None
    ) -> List[NotionDocument]:
        """Export Notion content to document collection.

        Extracts content from specified pages and databases and
        converts them to structured document objects.

        Args:
            page_ids: List of page IDs to export
            database_ids: List of database IDs to export

        Returns:
            List of NotionDocument objects containing content and metadata

        Raises:
            ValueError: If neither page_ids nor database_ids provided
        """
        extracted_objects = await self.notion_exporter.async_export_pages(
            page_ids=page_ids, database_ids=database_ids
        )

        objects = []
        for object_id, extracted_data in extracted_objects.items():
            objects.append(
                {
                    "metadata": extracted_data["metadata"],
                    "markdown": extracted_data["content"],
                }
            )

        return objects

__init__(api_token)

Initialize Notion exporter.

Parameters:
  • api_token (str) –

    Authentication token for Notion API

Source code in src/extraction/datasources/notion/exporter.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
def __init__(
    self,
    api_token: str,
):
    """Initialize Notion exporter.

    Args:
        api_token: Authentication token for Notion API
    """
    self.notion_exporter = _NotionExporterCore(
        notion_token=api_token,
        export_child_pages=False,
        extract_page_metadata=True,
    )

run(page_ids=None, database_ids=None) async

Export Notion content to document collection.

Extracts content from specified pages and databases and converts them to structured document objects.

Parameters:
  • page_ids (List[str], default: None ) –

    List of page IDs to export

  • database_ids (List[str], default: None ) –

    List of database IDs to export

Returns:
  • List[NotionDocument]

    List of NotionDocument objects containing content and metadata

Raises:
  • ValueError

    If neither page_ids nor database_ids provided

Source code in src/extraction/datasources/notion/exporter.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
async def run(
    self, page_ids: List[str] = None, database_ids: List[str] = None
) -> List[NotionDocument]:
    """Export Notion content to document collection.

    Extracts content from specified pages and databases and
    converts them to structured document objects.

    Args:
        page_ids: List of page IDs to export
        database_ids: List of database IDs to export

    Returns:
        List of NotionDocument objects containing content and metadata

    Raises:
        ValueError: If neither page_ids nor database_ids provided
    """
    extracted_objects = await self.notion_exporter.async_export_pages(
        page_ids=page_ids, database_ids=database_ids
    )

    objects = []
    for object_id, extracted_data in extracted_objects.items():
        objects.append(
            {
                "metadata": extracted_data["metadata"],
                "markdown": extracted_data["content"],
            }
        )

    return objects

NotionExporterFactory

Bases: SingletonFactory

Factory for creating NotionExporter instances.

Ensures only one instance of NotionExporter is created and reused throughout the application, following the singleton pattern.

Source code in src/extraction/datasources/notion/exporter.py
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
class NotionExporterFactory(SingletonFactory):
    """Factory for creating NotionExporter instances.

    Ensures only one instance of NotionExporter is created and reused
    throughout the application, following the singleton pattern.
    """

    _configuration_class = NotionDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: NotionDatasourceConfiguration
    ) -> NotionExporter:
        """Create a NotionExporter instance with the given configuration.

        Args:
            configuration: Configuration containing Notion API token

        Returns:
            Configured NotionExporter instance
        """
        return NotionExporter(
            configuration.secrets.api_token.get_secret_value()
        )

Manager

NotionDatasourceManager

Bases: BaseDatasourceManager[NotionDocument]

Manager for handling Notion datasource extraction and processing.

This class coordinates the reading, parsing, and cleaning of Notion content to produce structured NotionDocument objects ready for further processing.

Source code in src/extraction/datasources/notion/manager.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class NotionDatasourceManager(BaseDatasourceManager[NotionDocument]):
    """Manager for handling Notion datasource extraction and processing.

    This class coordinates the reading, parsing, and cleaning of Notion content
    to produce structured NotionDocument objects ready for further processing.
    """

    def __init__(
        self,
        configuration: NotionDatasourceConfiguration,
        reader: NotionDatasourceReader,
        parser: NotionDatasourceParser,
        cleaner: NotionDatasourceCleaner,
    ):
        """Initialize the Notion datasource manager.

        Args:
            configuration: Configuration for the Notion datasource
            reader: Component responsible for fetching data from Notion
            parser: Component responsible for parsing Notion data
            cleaner: Component responsible for cleaning parsed Notion documents
        """
        self.configuration = configuration
        self.reader = reader
        self.parser = parser
        self.cleaner = cleaner

    def incremental_sync(self):
        """
        Not implemented.
        """
        raise NotImplementedError("Currently unsupported feature.")

    async def full_refresh_sync(
        self,
    ) -> AsyncIterator[NotionDocument]:
        """Perform a full refresh of all documents from the Notion datasource.

        This method reads all objects from the Notion datasource, parses them
        into documents, cleans them, and yields the cleaned documents.

        Returns:
            An async iterator of cleaned NotionDocument objects
        """
        objects = await self.reader.read_all_async()
        for object in objects:
            document = self.parser.parse(object)
            cleaned_document = self.cleaner.clean(document)
            if cleaned_document:
                yield cleaned_document

__init__(configuration, reader, parser, cleaner)

Initialize the Notion datasource manager.

Parameters:
  • configuration (NotionDatasourceConfiguration) –

    Configuration for the Notion datasource

  • reader (NotionDatasourceReader) –

    Component responsible for fetching data from Notion

  • parser (NotionDatasourceParser) –

    Component responsible for parsing Notion data

  • cleaner (NotionDatasourceCleaner) –

    Component responsible for cleaning parsed Notion documents

Source code in src/extraction/datasources/notion/manager.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(
    self,
    configuration: NotionDatasourceConfiguration,
    reader: NotionDatasourceReader,
    parser: NotionDatasourceParser,
    cleaner: NotionDatasourceCleaner,
):
    """Initialize the Notion datasource manager.

    Args:
        configuration: Configuration for the Notion datasource
        reader: Component responsible for fetching data from Notion
        parser: Component responsible for parsing Notion data
        cleaner: Component responsible for cleaning parsed Notion documents
    """
    self.configuration = configuration
    self.reader = reader
    self.parser = parser
    self.cleaner = cleaner

full_refresh_sync() async

Perform a full refresh of all documents from the Notion datasource.

This method reads all objects from the Notion datasource, parses them into documents, cleans them, and yields the cleaned documents.

Returns:
  • AsyncIterator[NotionDocument]

    An async iterator of cleaned NotionDocument objects

Source code in src/extraction/datasources/notion/manager.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def full_refresh_sync(
    self,
) -> AsyncIterator[NotionDocument]:
    """Perform a full refresh of all documents from the Notion datasource.

    This method reads all objects from the Notion datasource, parses them
    into documents, cleans them, and yields the cleaned documents.

    Returns:
        An async iterator of cleaned NotionDocument objects
    """
    objects = await self.reader.read_all_async()
    for object in objects:
        document = self.parser.parse(object)
        cleaned_document = self.cleaner.clean(document)
        if cleaned_document:
            yield cleaned_document

incremental_sync()

Not implemented.

Source code in src/extraction/datasources/notion/manager.py
50
51
52
53
54
def incremental_sync(self):
    """
    Not implemented.
    """
    raise NotImplementedError("Currently unsupported feature.")

NotionDatasourceManagerFactory

Bases: Factory

Factory for creating NotionDatasourceManager instances.

This factory is responsible for creating instances of the NotionDatasourceManager class, which manages the extraction and processing of content from Notion databases and pages.

Attributes:
  • _configuration_class

    Type of configuration object for Notion datasource

Source code in src/extraction/datasources/notion/manager.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class NotionDatasourceManagerFactory(Factory):
    """Factory for creating NotionDatasourceManager instances.

    This factory is responsible for creating instances of the
    NotionDatasourceManager class, which manages the extraction and
    processing of content from Notion databases and pages.

    Attributes:
        _configuration_class: Type of configuration object for Notion datasource
    """

    _configuration_class = NotionDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls,
        configuration: NotionDatasourceConfiguration,
    ) -> NotionDatasourceManager:
        """Create a new instance of NotionDatasourceManager.

        This method creates all necessary components (reader, parser, cleaner)
        and assembles them into a NotionDatasourceManager instance.

        Args:
            configuration: Configuration object for the Notion datasource

        Returns:
            A fully configured NotionDatasourceManager instance
        """
        reader = NotionDatasourceReaderFactory.create(configuration)
        parser = NotionDatasourceParserFactory.create(configuration)
        cleaner = NotionDatasourceCleanerFactory.create(configuration)
        return NotionDatasourceManager(
            configuration=configuration,
            reader=reader,
            parser=parser,
            cleaner=cleaner,
        )

Parser

NotionDatasourceParser

Bases: BaseParser[NotionDocument]

Parser for Notion content.

Transforms raw Notion page data into structured NotionDocument objects.

Source code in src/extraction/datasources/notion/parser.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class NotionDatasourceParser(BaseParser[NotionDocument]):
    """Parser for Notion content.

    Transforms raw Notion page data into structured NotionDocument objects.
    """

    def __init__(self):
        """Initialize the Notion parser."""
        pass

    def parse(self, object: str) -> NotionDocument:
        """Parse Notion page data into a structured document.

        Args:
            object: Dictionary containing Notion page content with 'markdown' text
                   and 'metadata' information.

        Returns:
            A NotionDocument containing the parsed content and enhanced metadata.
        """
        markdown = object["markdown"]
        metadata = self._extract_metadata(object["metadata"])
        return NotionDocument(text=markdown, metadata=metadata)

    @staticmethod
    def _extract_metadata(metadata: dict) -> dict:
        """Process and enhance page metadata.

        Args:
            metadata: Raw page metadata dictionary

        Returns:
            dict: Enhanced metadata including source and formatted dates
        """
        metadata["datasource"] = "notion"
        metadata["created_date"] = metadata["created_time"].split("T")[0]
        metadata["last_edited_date"] = metadata["last_edited_time"].split("T")[
            0
        ]
        return metadata

__init__()

Initialize the Notion parser.

Source code in src/extraction/datasources/notion/parser.py
15
16
17
def __init__(self):
    """Initialize the Notion parser."""
    pass

parse(object)

Parse Notion page data into a structured document.

Parameters:
  • object (str) –

    Dictionary containing Notion page content with 'markdown' text and 'metadata' information.

Returns:
  • NotionDocument

    A NotionDocument containing the parsed content and enhanced metadata.

Source code in src/extraction/datasources/notion/parser.py
19
20
21
22
23
24
25
26
27
28
29
30
31
def parse(self, object: str) -> NotionDocument:
    """Parse Notion page data into a structured document.

    Args:
        object: Dictionary containing Notion page content with 'markdown' text
               and 'metadata' information.

    Returns:
        A NotionDocument containing the parsed content and enhanced metadata.
    """
    markdown = object["markdown"]
    metadata = self._extract_metadata(object["metadata"])
    return NotionDocument(text=markdown, metadata=metadata)

NotionDatasourceParserFactory

Bases: Factory

Factory for creating NotionDatasourceParser instances.

Creates and configures parser instances for Notion content.

Source code in src/extraction/datasources/notion/parser.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class NotionDatasourceParserFactory(Factory):
    """Factory for creating NotionDatasourceParser instances.

    Creates and configures parser instances for Notion content.
    """

    _configuration_class: NotionDatasourceConfiguration = (
        NotionDatasourceConfiguration
    )

    @classmethod
    def _create_instance(
        cls, _: NotionDatasourceConfiguration
    ) -> NotionDatasourceParser:
        """
        Create a NotionDatasourceParser instance.
        Returns:
            NotionDatasourceParser: Instance of NotionDatasourceParser.
        """
        return NotionDatasourceParser()

Reader

NotionDatasourceReader

Bases: BaseReader

Reader for extracting documents from Notion workspace.

Implements document extraction from Notion pages and databases with support for batched async operations and export limits.

Source code in src/extraction/datasources/notion/reader.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class NotionDatasourceReader(BaseReader):
    """Reader for extracting documents from Notion workspace.

    Implements document extraction from Notion pages and databases with
    support for batched async operations and export limits.
    """

    def __init__(
        self,
        configuration: NotionDatasourceConfiguration,
        client: Client,
        exporter: NotionExporter,
        logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
    ):
        """Initialize Notion reader.

        Args:
            configuration: Settings for Notion access and limits
            client: Client for Notion API interaction
            exporter: Component for content export and conversion
            logger: Logger for logging messages and errors
        """
        super().__init__()
        self.client = client
        self.export_batch_size = configuration.export_batch_size
        self.export_limit = configuration.export_limit
        self.exporter = exporter
        self.home_page_database_id = configuration.home_page_database_id
        self.logger = logger

    async def read_all_async(self) -> List[NotionDocument]:
        """Asynchronously retrieve all documents from Notion.

        Fetches pages and databases in batches, respecting export limits
        and batch sizes.

        Returns:
            List[NotionDocument]: Collection of processed documents
        """
        if self.home_page_database_id is None:
            database_ids = []
            page_ids = []
        else:
            database_ids, page_ids = self._get_ids_from_home_page()

        database_ids.extend(
            self._get_all_ids(
                NotionObjectType.DATABASE,
                limit=self._get_current_limit(database_ids, page_ids),
            )
        )
        page_ids.extend(
            self._get_all_ids(
                NotionObjectType.PAGE,
                limit=self._get_current_limit(database_ids, page_ids),
            )
        )

        # Process IDs
        database_ids = set(database_ids)
        database_ids.discard(self.home_page_database_id)
        page_ids = set(page_ids)

        # Batch and export
        chunked_database_ids = list(
            chunked(database_ids, self.export_batch_size)
        )
        chunked_page_ids = list(chunked(page_ids, self.export_batch_size))

        databases, databases_failed = await self._export_documents(
            chunked_database_ids, NotionObjectType.DATABASE
        )
        pages, pages_failed = await self._export_documents(
            chunked_page_ids, NotionObjectType.PAGE
        )

        # Log failures
        if databases_failed:
            self.logger.warning(
                f"Failed to export {len(databases_failed)} databases: {databases_failed}"
            )
        if pages_failed:
            self.logger.warning(
                f"Failed to export {len(pages_failed)} pages: {pages_failed}"
            )

        # Apply limit if needed
        objects = databases + pages
        return (
            objects
            if self.export_limit is None
            else objects[: self.export_limit]
        )

    async def _export_documents(
        self, chunked_ids: List[List[str]], objects_type: NotionObjectType
    ) -> Tuple[List[NotionDocument], List[str]]:
        """Export Notion documents in batches with progress tracking.

        Processes batches of Notion object IDs, exporting them through the exporter
        component. Handles errors gracefully by tracking failed exports and
        continuing with the next batch.

        Args:
            chunked_ids: List of ID batches, where each batch is a list of IDs
                         to be processed together
            objects_type: Type of Notion objects to export (PAGE or DATABASE)

        Returns:
            Tuple containing:
                - List of successfully exported NotionDocument objects
                - List of IDs that failed during export

        Raises:
            ValueError: If objects_type is not a valid NotionObjectType
        """
        all_objects = []
        failed_exports = []
        total_ids = sum(len(chunk) for chunk in chunked_ids)

        with tqdm(
            total=total_ids,
            desc=f"[Notion] Exporting {objects_type.name}s",
            unit="objects",
        ) as pbar:
            for chunk_ids in chunked_ids:
                try:
                    objects = await self.exporter.run(
                        page_ids=(
                            chunk_ids
                            if objects_type == NotionObjectType.PAGE
                            else None
                        ),
                        database_ids=(
                            chunk_ids
                            if objects_type == NotionObjectType.DATABASE
                            else None
                        ),
                    )
                    all_objects.extend(objects)
                    pbar.update(len(chunk_ids))
                    self.logger.debug(
                        f"Added {len(objects)} {objects_type.name}s"
                    )
                except Exception as e:
                    self.logger.error(
                        f"Export failed for {objects_type.name}: {chunk_ids}. {e}"
                    )
                    failed_exports.extend(chunk_ids)
                    pbar.update(len(chunk_ids))

        if failed_exports:
            self.logger.warning(
                f"Failed to export {len(failed_exports)} {objects_type.name}s"
            )

        return all_objects, failed_exports

    def _get_ids_from_home_page(self) -> Tuple[List[str], List[str]]:
        """Extract database and page IDs from home page database.

        Queries the configured home page database and extracts IDs for
        both databases and pages.

        Returns:
            Tuple containing:
                - List of database IDs
                - List of page IDs
        """
        self.logger.info(
            f"Fetching all object ids from Notion's home page with limit {self.export_limit}..."
        )
        response = self._collect_paginated_api(
            function=self.client.databases.query,
            limit=self.export_limit,
            database_id=self.home_page_database_id,
        )
        database_ids = [
            entry["id"] for entry in response if entry["object"] == "database"
        ]
        page_ids = [
            entry["id"] for entry in response if entry["object"] == "page"
        ]

        self.logger.info(
            f"Found {len(database_ids)} database ids and {len(page_ids)} page ids in Notion."
        )

        return database_ids, page_ids

    def _get_all_ids(
        self, objects_type: NotionObjectType, limit: int = None
    ) -> List[str]:
        """Fetch all IDs for specified Notion object type.

        Args:
            objects_type: Type of Notion objects to fetch
            limit: Maximum number of IDs to fetch (None for unlimited)

        Returns:
            List[str]: Collection of object IDs

        Note:
            Returns empty list if limit is 0 or negative
        """
        if limit is not None and limit <= 0:
            return []

        self.logger.info(
            f"Fetching all ids of {objects_type.name} objects from Notion API with limit {limit}..."
        )

        params = {
            "filter": {
                "value": objects_type.name.lower(),
                "property": "object",
            },
        }
        results = NotionDatasourceReader._collect_paginated_api(
            self.client.search, limit, **params
        )
        object_ids = [object["id"] for object in results]
        object_ids = object_ids[:limit] if limit is not None else object_ids

        self.logger.info(
            f"Found {len(object_ids)} ids of {objects_type.name} objects in Notion."
        )

        return object_ids

    def _get_current_limit(
        self, database_ids: List[str], page_ids: List[str]
    ) -> int:
        """Calculate remaining object limit based on existing IDs.

        Args:
            database_ids: Currently collected database IDs
            page_ids: Currently collected page IDs

        Returns:
            int: Remaining limit (None if no limit configured)

        Note:
            Subtracts total of existing IDs from configured export limit
        """
        return (
            self.export_limit - len(database_ids) - len(page_ids)
            if self.export_limit
            else None
        )

    @staticmethod
    def _collect_paginated_api(
        function: Callable[..., Any], limit: int, **kwargs: Any
    ) -> List[Any]:
        """Collect all results from paginated Notion API endpoint.

        Args:
            function: API function to call
            limit: Maximum number of results to collect
            **kwargs: Additional arguments for API function

        Returns:
            List[Any]: Collected API results
        """
        next_cursor = kwargs.pop("start_cursor", None)
        result = []

        while True:
            response = function(**kwargs, start_cursor=next_cursor)
            result.extend(response.get("results"))

            if NotionDatasourceReader._limit_reached(result, limit):
                return result[:limit]
            if not NotionDatasourceReader._has_more_pages(response):
                return result[:limit] if limit else result

            next_cursor = response.get("next_cursor")

    @staticmethod
    def _limit_reached(result: List[dict], limit: int) -> bool:
        """Check if result count has reached limit.

        Args:
            result: Current results
            limit: Maximum allowed results

        Returns:
            bool: True if limit reached
        """
        return limit is not None and len(result) >= limit

    @staticmethod
    def _has_more_pages(response: dict) -> bool:
        """Check if more pages are available.

        Args:
            response: API response dictionary

        Returns:
            bool: True if more pages available
        """
        return response.get("has_more") and response.get("next_cursor")

__init__(configuration, client, exporter, logger=LoggerConfiguration.get_logger(__name__))

Initialize Notion reader.

Parameters:
  • configuration (NotionDatasourceConfiguration) –

    Settings for Notion access and limits

  • client (Client) –

    Client for Notion API interaction

  • exporter (NotionExporter) –

    Component for content export and conversion

  • logger (Logger, default: get_logger(__name__) ) –

    Logger for logging messages and errors

Source code in src/extraction/datasources/notion/reader.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    configuration: NotionDatasourceConfiguration,
    client: Client,
    exporter: NotionExporter,
    logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
):
    """Initialize Notion reader.

    Args:
        configuration: Settings for Notion access and limits
        client: Client for Notion API interaction
        exporter: Component for content export and conversion
        logger: Logger for logging messages and errors
    """
    super().__init__()
    self.client = client
    self.export_batch_size = configuration.export_batch_size
    self.export_limit = configuration.export_limit
    self.exporter = exporter
    self.home_page_database_id = configuration.home_page_database_id
    self.logger = logger

read_all_async() async

Asynchronously retrieve all documents from Notion.

Fetches pages and databases in batches, respecting export limits and batch sizes.

Returns:
  • List[NotionDocument]

    List[NotionDocument]: Collection of processed documents

Source code in src/extraction/datasources/notion/reader.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
async def read_all_async(self) -> List[NotionDocument]:
    """Asynchronously retrieve all documents from Notion.

    Fetches pages and databases in batches, respecting export limits
    and batch sizes.

    Returns:
        List[NotionDocument]: Collection of processed documents
    """
    if self.home_page_database_id is None:
        database_ids = []
        page_ids = []
    else:
        database_ids, page_ids = self._get_ids_from_home_page()

    database_ids.extend(
        self._get_all_ids(
            NotionObjectType.DATABASE,
            limit=self._get_current_limit(database_ids, page_ids),
        )
    )
    page_ids.extend(
        self._get_all_ids(
            NotionObjectType.PAGE,
            limit=self._get_current_limit(database_ids, page_ids),
        )
    )

    # Process IDs
    database_ids = set(database_ids)
    database_ids.discard(self.home_page_database_id)
    page_ids = set(page_ids)

    # Batch and export
    chunked_database_ids = list(
        chunked(database_ids, self.export_batch_size)
    )
    chunked_page_ids = list(chunked(page_ids, self.export_batch_size))

    databases, databases_failed = await self._export_documents(
        chunked_database_ids, NotionObjectType.DATABASE
    )
    pages, pages_failed = await self._export_documents(
        chunked_page_ids, NotionObjectType.PAGE
    )

    # Log failures
    if databases_failed:
        self.logger.warning(
            f"Failed to export {len(databases_failed)} databases: {databases_failed}"
        )
    if pages_failed:
        self.logger.warning(
            f"Failed to export {len(pages_failed)} pages: {pages_failed}"
        )

    # Apply limit if needed
    objects = databases + pages
    return (
        objects
        if self.export_limit is None
        else objects[: self.export_limit]
    )

NotionDatasourceReaderFactory

Bases: Factory

Factory for creating NotionDatasourceReader instances.

This class is responsible for creating instances of NotionDatasourceReader with the provided configuration and Notion client.

Attributes:
  • _configuration_class

    The configuration class used to create the NotionDatasourceReader instance.

Source code in src/extraction/datasources/notion/reader.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
class NotionDatasourceReaderFactory(Factory):
    """Factory for creating NotionDatasourceReader instances.

    This class is responsible for creating instances of NotionDatasourceReader
    with the provided configuration and Notion client.

    Attributes:
        _configuration_class: The configuration class used to create the NotionDatasourceReader instance.
    """

    _configuration_class = NotionDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls,
        configuration: NotionDatasourceConfiguration,
    ) -> NotionDatasourceReader:
        client = NotionClientFactory.create(configuration)
        exporter = NotionExporterFactory.create(configuration)
        return NotionDatasourceReader(
            configuration=configuration,
            client=client,
            exporter=exporter,
        )

NotionObjectType

Bases: Enum

Enum representing Notion object types. This enum is used to specify the type of Notion object being processed, such as a page or a database.

Source code in src/extraction/datasources/notion/reader.py
23
24
25
26
27
28
29
30
31
class NotionObjectType(Enum):
    """
    Enum representing Notion object types.
    This enum is used to specify the type of Notion object being processed,
    such as a page or a database.
    """

    PAGE = "page"
    DATABASE = "database"