Orchestrator

This module contains functionality related to the the orchestrator module for extraction.orchestrators.basic.

Orchestrator

BasicDatasourceOrchestrator

Bases: BaseDatasourceOrchestrator

Orchestrator for multi-datasource content processing.

Source code in src/extraction/orchestrators/basic/orchestrator.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class BasicDatasourceOrchestrator(BaseDatasourceOrchestrator):
    """
    Orchestrator for multi-datasource content processing.
    """

    async def full_refresh_sync(self) -> AsyncIterator[BaseDocument]:
        """Extract and process content from all datasources.

        Processes each configured datasource to extract documents and clean content.

        Returns:
            AsyncIterator[BaseDocument]: Stream of documents extracted from all datasources
        """
        for datasource_manager in self.datasource_managers:
            async for document in datasource_manager.full_refresh_sync():
                yield document

    async def incremental_sync(self) -> AsyncIterator[BaseDocument]:
        """
        Not implemented yet.
        """
        raise NotImplementedError("Incremental sync is not supported yet.")

full_refresh_sync() async

Extract and process content from all datasources.

Processes each configured datasource to extract documents and clean content.

Returns:
  • AsyncIterator[BaseDocument]

    AsyncIterator[BaseDocument]: Stream of documents extracted from all datasources

Source code in src/extraction/orchestrators/basic/orchestrator.py
17
18
19
20
21
22
23
24
25
26
27
async def full_refresh_sync(self) -> AsyncIterator[BaseDocument]:
    """Extract and process content from all datasources.

    Processes each configured datasource to extract documents and clean content.

    Returns:
        AsyncIterator[BaseDocument]: Stream of documents extracted from all datasources
    """
    for datasource_manager in self.datasource_managers:
        async for document in datasource_manager.full_refresh_sync():
            yield document

incremental_sync() async

Not implemented yet.

Source code in src/extraction/orchestrators/basic/orchestrator.py
29
30
31
32
33
async def incremental_sync(self) -> AsyncIterator[BaseDocument]:
    """
    Not implemented yet.
    """
    raise NotImplementedError("Incremental sync is not supported yet.")

BasicDatasourceOrchestratorFactory

Bases: Factory

Factory for creating BasicDatasourceOrchestrator instances.

Creates orchestrator instances configured with appropriate datasource managers based on the provided extraction configuration.

Source code in src/extraction/orchestrators/basic/orchestrator.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class BasicDatasourceOrchestratorFactory(Factory):
    """Factory for creating BasicDatasourceOrchestrator instances.

    Creates orchestrator instances configured with appropriate datasource managers
    based on the provided extraction configuration.
    """

    _configuration_class: Type = ExtractionConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: ExtractionConfiguration
    ) -> BasicDatasourceOrchestrator:
        """Creates a configured BasicDatasourceOrchestrator.

        Initializes datasource managers for each configured datasource
        and creates an orchestrator instance with those managers.

        Args:
            configuration: Settings for extraction process configuration

        Returns:
            BasicDatasourceOrchestrator: Configured orchestrator instance
        """
        datasource_managers = [
            DatasourceManagerRegistry.get(datasource_configuration.name).create(
                datasource_configuration
            )
            for datasource_configuration in configuration.extraction.datasources
        ]
        return BasicDatasourceOrchestrator(
            datasource_managers=datasource_managers
        )