Bundestag Datasource

This module contains functionality related to the Bundestag datasource.

Client

BundestagMineClient

Bases: APIClient

API Client for the bundestag-mine.de API.

Source code in src/extraction/datasources/bundestag/client.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class BundestagMineClient(APIClient):
    """
    API Client for the bundestag-mine.de API.
    """

    BASE_URL = "https://bundestag-mine.de/api/DashboardController"
    logger = LoggerConfiguration.get_logger(__name__)

    def safe_get(self, path: str) -> Optional[Any]:
        """
        Perform a GET request, raise for HTTP errors, parse JSON, check API status.

        Args:
            path: endpoint path under BASE_URL, e.g. "GetProtocols" or
                  "GetAgendaItemsOfProtocol/<protocol_id>"

        Returns:
            Dict[str, Any]: The 'result' field of the API response as a dict.

        Raises:
            ResponseParseError: if HTTP status is not OK or unexpected JSON structure.
        """
        url = f"{self.BASE_URL}/{path.lstrip('/')}"
        resp = self.get(url)
        try:
            resp.raise_for_status()
        except Exception as e:
            self.logger.error(f"HTTP error for {url}: {e}")
            return None

        data = resp.json()
        if not isinstance(data, dict) or data.get("status") != "200":
            self.logger.error(f"Unexpected response for {url}: {data}")
            return None

        result = data.get("result")
        if result is None:
            self.logger.debug(f"No result found for {url}")
            return None

        return result

    def get_protocols(self) -> Iterator[Protocol]:
        """
        Fetches the list of all protocols.

        Returns:
            Iterator[Protocol]: An iterator of valid protocols as Pydantic models.
        """
        result = self.safe_get("GetProtocols")
        if not isinstance(result, list):
            raise ResponseParseError(
                f"Expected list of protocols, got: {result}"
            )

        for protocol_data in result:
            try:
                yield Protocol.model_validate(protocol_data)
            except ValidationError as e:
                self.logger.warning(
                    f"Failed to validate protocol: {protocol_data}. Error: {e}"
                )

    def get_agenda_items(self, protocol_id: str) -> Iterator[AgendaItem]:
        """
        Fetches agenda items for a specific protocol ID.

        Args:
            protocol_id (str): The ID of the protocol.

        Returns:
            Iterator[AgendaItem]: An iterator of valid agenda items as Pydantic models.
        """
        result = self.safe_get(f"GetAgendaItemsOfProtocol/{protocol_id}")
        if result is None:
            self.logger.debug(f"No agenda items found for {protocol_id}")
            return

        items = result.get("agendaItems")

        if "items" == None:
            self.logger.debug(f"No agenda items found for {protocol_id}")
            return
        if not isinstance(items, list):
            self.logger.error(
                f"Expected list of agendaItems for {protocol_id}, got: {items}"
            )
            return

        for item_data in items:
            try:
                yield AgendaItem.model_validate(item_data)
            except ValidationError as e:
                self.logger.warning(
                    f"Failed to validate agenda item: {item_data}. Error: {e}"
                )

    def get_speaker_data(self, speaker_id: str) -> Optional[Speaker]:
        """
        Fetches speaker data for a specific speaker ID.

        Args:
            speaker_id (str): The ID of the speaker.

        Returns:
            Optional[Speaker]: Speaker data as a Pydantic model, or None if validation fails.
        """
        result = self.safe_get(f"GetSpeakerById/{speaker_id}")
        if not isinstance(result, dict):
            self.logger.error(
                f"Expected speaker data for {speaker_id}, got: {result}"
            )
            return None

        try:
            return Speaker.model_validate(result)
        except ValidationError as e:
            self.logger.warning(
                f"Failed to validate speaker data for {speaker_id}: {result}. Error: {e}"
            )
            return None

    @retry_request
    def get_speeches(
        self,
        protocol: Protocol,
        agenda_item: AgendaItem,
    ) -> Iterator[BundestagSpeech]:
        """
        Fetches speeches for a specific agenda item within a protocol.

        Args:
            legislature_period (int): The legislature period.
            protocol_number (int): The protocol number.
            agenda_item_number (str): The agenda item number.

        Returns:
            Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.
        """
        raw = f"{protocol.legislaturePeriod},{protocol.number},{agenda_item.agendaItemNumber}"
        encoded = quote(raw, safe="")
        result = self.safe_get(f"GetSpeechesOfAgendaItem/{encoded}")

        if result is None:
            self.logger.debug(f"No speeches found for {raw}")
            return

        speeches = result.get("speeches")
        if speeches is None:
            self.logger.debug(f"No speeches found for {raw}")
            return

        if not isinstance(speeches, list):
            self.logger.warning(
                f"Expected list of speeches for {raw}, got: {speeches}"
            )
            return

        for speech in speeches:
            try:
                speech = BundestagSpeech.model_validate(speech)
                speech.protocol = protocol
                speech.agendaItem = agenda_item
                yield speech
            except ValidationError as e:
                self.logger.warning(
                    f"Failed to validate speech: {speech}. Error: {e}"
                )

    def fetch_all_speeches(self) -> Iterator[BundestagSpeech]:
        """
        Fetches all speeches by iterating through protocols and their agenda items.

        Returns:
            Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.
        """
        for protocol in self.get_protocols():
            self.logger.info(f"Processing protocol {protocol.id}")

            for agenda_item in self.get_agenda_items(protocol.id):

                for speech in self.get_speeches(
                    protocol=protocol,
                    agenda_item=agenda_item,
                ):
                    speaker = self.get_speaker_data(speech.speakerId)
                    if speaker:
                        speech.speaker = speaker
                        yield speech

fetch_all_speeches()

Fetches all speeches by iterating through protocols and their agenda items.

Returns:
  • Iterator[BundestagSpeech]

    Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.

Source code in src/extraction/datasources/bundestag/client.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def fetch_all_speeches(self) -> Iterator[BundestagSpeech]:
    """
    Fetches all speeches by iterating through protocols and their agenda items.

    Returns:
        Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.
    """
    for protocol in self.get_protocols():
        self.logger.info(f"Processing protocol {protocol.id}")

        for agenda_item in self.get_agenda_items(protocol.id):

            for speech in self.get_speeches(
                protocol=protocol,
                agenda_item=agenda_item,
            ):
                speaker = self.get_speaker_data(speech.speakerId)
                if speaker:
                    speech.speaker = speaker
                    yield speech

get_agenda_items(protocol_id)

Fetches agenda items for a specific protocol ID.

Parameters:
  • protocol_id (str) –

    The ID of the protocol.

Returns:
  • Iterator[AgendaItem]

    Iterator[AgendaItem]: An iterator of valid agenda items as Pydantic models.

Source code in src/extraction/datasources/bundestag/client.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def get_agenda_items(self, protocol_id: str) -> Iterator[AgendaItem]:
    """
    Fetches agenda items for a specific protocol ID.

    Args:
        protocol_id (str): The ID of the protocol.

    Returns:
        Iterator[AgendaItem]: An iterator of valid agenda items as Pydantic models.
    """
    result = self.safe_get(f"GetAgendaItemsOfProtocol/{protocol_id}")
    if result is None:
        self.logger.debug(f"No agenda items found for {protocol_id}")
        return

    items = result.get("agendaItems")

    if "items" == None:
        self.logger.debug(f"No agenda items found for {protocol_id}")
        return
    if not isinstance(items, list):
        self.logger.error(
            f"Expected list of agendaItems for {protocol_id}, got: {items}"
        )
        return

    for item_data in items:
        try:
            yield AgendaItem.model_validate(item_data)
        except ValidationError as e:
            self.logger.warning(
                f"Failed to validate agenda item: {item_data}. Error: {e}"
            )

get_protocols()

Fetches the list of all protocols.

Returns:
  • Iterator[Protocol]

    Iterator[Protocol]: An iterator of valid protocols as Pydantic models.

Source code in src/extraction/datasources/bundestag/client.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def get_protocols(self) -> Iterator[Protocol]:
    """
    Fetches the list of all protocols.

    Returns:
        Iterator[Protocol]: An iterator of valid protocols as Pydantic models.
    """
    result = self.safe_get("GetProtocols")
    if not isinstance(result, list):
        raise ResponseParseError(
            f"Expected list of protocols, got: {result}"
        )

    for protocol_data in result:
        try:
            yield Protocol.model_validate(protocol_data)
        except ValidationError as e:
            self.logger.warning(
                f"Failed to validate protocol: {protocol_data}. Error: {e}"
            )

get_speaker_data(speaker_id)

Fetches speaker data for a specific speaker ID.

Parameters:
  • speaker_id (str) –

    The ID of the speaker.

Returns:
  • Optional[Speaker]

    Optional[Speaker]: Speaker data as a Pydantic model, or None if validation fails.

Source code in src/extraction/datasources/bundestag/client.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def get_speaker_data(self, speaker_id: str) -> Optional[Speaker]:
    """
    Fetches speaker data for a specific speaker ID.

    Args:
        speaker_id (str): The ID of the speaker.

    Returns:
        Optional[Speaker]: Speaker data as a Pydantic model, or None if validation fails.
    """
    result = self.safe_get(f"GetSpeakerById/{speaker_id}")
    if not isinstance(result, dict):
        self.logger.error(
            f"Expected speaker data for {speaker_id}, got: {result}"
        )
        return None

    try:
        return Speaker.model_validate(result)
    except ValidationError as e:
        self.logger.warning(
            f"Failed to validate speaker data for {speaker_id}: {result}. Error: {e}"
        )
        return None

get_speeches(protocol, agenda_item)

Fetches speeches for a specific agenda item within a protocol.

Parameters:
  • legislature_period (int) –

    The legislature period.

  • protocol_number (int) –

    The protocol number.

  • agenda_item_number (str) –

    The agenda item number.

Returns:
  • Iterator[BundestagSpeech]

    Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.

Source code in src/extraction/datasources/bundestag/client.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@retry_request
def get_speeches(
    self,
    protocol: Protocol,
    agenda_item: AgendaItem,
) -> Iterator[BundestagSpeech]:
    """
    Fetches speeches for a specific agenda item within a protocol.

    Args:
        legislature_period (int): The legislature period.
        protocol_number (int): The protocol number.
        agenda_item_number (str): The agenda item number.

    Returns:
        Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.
    """
    raw = f"{protocol.legislaturePeriod},{protocol.number},{agenda_item.agendaItemNumber}"
    encoded = quote(raw, safe="")
    result = self.safe_get(f"GetSpeechesOfAgendaItem/{encoded}")

    if result is None:
        self.logger.debug(f"No speeches found for {raw}")
        return

    speeches = result.get("speeches")
    if speeches is None:
        self.logger.debug(f"No speeches found for {raw}")
        return

    if not isinstance(speeches, list):
        self.logger.warning(
            f"Expected list of speeches for {raw}, got: {speeches}"
        )
        return

    for speech in speeches:
        try:
            speech = BundestagSpeech.model_validate(speech)
            speech.protocol = protocol
            speech.agendaItem = agenda_item
            yield speech
        except ValidationError as e:
            self.logger.warning(
                f"Failed to validate speech: {speech}. Error: {e}"
            )

safe_get(path)

Perform a GET request, raise for HTTP errors, parse JSON, check API status.

Parameters:
  • path (str) –

    endpoint path under BASE_URL, e.g. "GetProtocols" or "GetAgendaItemsOfProtocol/"

Returns:
  • Optional[Any]

    Dict[str, Any]: The 'result' field of the API response as a dict.

Raises:
  • ResponseParseError

    if HTTP status is not OK or unexpected JSON structure.

Source code in src/extraction/datasources/bundestag/client.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def safe_get(self, path: str) -> Optional[Any]:
    """
    Perform a GET request, raise for HTTP errors, parse JSON, check API status.

    Args:
        path: endpoint path under BASE_URL, e.g. "GetProtocols" or
              "GetAgendaItemsOfProtocol/<protocol_id>"

    Returns:
        Dict[str, Any]: The 'result' field of the API response as a dict.

    Raises:
        ResponseParseError: if HTTP status is not OK or unexpected JSON structure.
    """
    url = f"{self.BASE_URL}/{path.lstrip('/')}"
    resp = self.get(url)
    try:
        resp.raise_for_status()
    except Exception as e:
        self.logger.error(f"HTTP error for {url}: {e}")
        return None

    data = resp.json()
    if not isinstance(data, dict) or data.get("status") != "200":
        self.logger.error(f"Unexpected response for {url}: {data}")
        return None

    result = data.get("result")
    if result is None:
        self.logger.debug(f"No result found for {url}")
        return None

    return result

BundestagMineClientFactory

Bases: SingletonFactory

Factory for creating and managing Bundestag client instances.

This factory ensures only one Bundestag client is created per configuration, following the singleton pattern provided by the parent SingletonFactory class.

Source code in src/extraction/datasources/bundestag/client.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
class BundestagMineClientFactory(SingletonFactory):
    """
    Factory for creating and managing Bundestag client instances.

    This factory ensures only one Bundestag client is created per configuration,
    following the singleton pattern provided by the parent SingletonFactory class.
    """

    _configuration_class: Type = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BundestagMineClient:
        """
        Creates a new BundestagMine client instance using the provided configuration.

        Args:
            configuration: Configuration object containing BundestagMine details

        Returns:
            A configured BundestagMine client instance ready for API interactions.
        """
        return BundestagMineClient()

BundestagSpeech

Bases: BaseModel

Represents a speech from BundestagMine API.

Source code in src/extraction/datasources/bundestag/client.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class BundestagSpeech(BaseModel):
    """Represents a speech from BundestagMine API."""

    id: str
    speakerId: str
    text: str
    speaker: Optional[Speaker] = None
    protocol: Optional[Protocol] = None
    agendaItem: Optional[AgendaItem] = None

    @model_validator(mode="after")
    def validate_text_not_empty(self) -> "BundestagSpeech":
        if not self.text or self.text.strip() == "":
            raise ValueError("BundestagSpeech text cannot be empty")
        return self

Configuration

Document

BundestagMineDocument

Bases: BaseDocument

Represents a document from the Bundestag datasource. Inherits from BaseDocument and includes additional metadata specific to Bundestag documents.

Source code in src/extraction/datasources/bundestag/document.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class BundestagMineDocument(BaseDocument):
    """
    Represents a document from the Bundestag datasource.
    Inherits from BaseDocument and includes additional metadata specific to Bundestag documents.
    """

    included_embed_metadata_keys: List[str] = [
        "title",
        "created_time",
        "last_edited_time",
        "speaker_party",
        "speaker",
        "protocol_number",
        "legislature_period",
    ]

    included_llm_metadata_keys: List[str] = [
        "title",
        "created_time",
        "last_edited_time",
        "speaker_party",
        "speaker",
        "protocol_number",
        "legislature_period",
    ]

Manager

BundestagMineDatasourceManagerFactory

Bases: Factory

Factory for creating datasource managers.

Provides type-safe creation of datasource managers based on configuration.

Attributes:
  • _configuration_class (Type) –

    Type of configuration object

Source code in src/extraction/datasources/bundestag/manager.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class BundestagMineDatasourceManagerFactory(Factory):
    """Factory for creating datasource managers.

    Provides type-safe creation of datasource managers based on configuration.

    Attributes:
        _configuration_class: Type of configuration object
    """

    _configuration_class: Type = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BasicDatasourceManager:
        """Create an instance of the BundestagMine datasource manager.

        This method constructs a BasicDatasourceManager by creating the appropriate
        reader and parser based on the provided configuration.

        Args:
            configuration: Configuration specifying how to set up the BundestagMine datasource
                          manager, reader, and parser.

        Returns:
            A configured BasicDatasourceManager instance for handling BundestagMine documents.
        """
        reader = BundestagMineDatasourceReaderFactory.create(configuration)
        parser = BundestagMineDatasourceParserFactory.create(configuration)
        return BasicDatasourceManager(
            configuration=configuration,
            reader=reader,
            parser=parser,
        )

Parser

BundestagMineDatasourceParser

Bases: BaseParser[BundestagMineDocument]

Source code in src/extraction/datasources/bundestag/parser.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class BundestagMineDatasourceParser(BaseParser[BundestagMineDocument]):

    logger = LoggerConfiguration.get_logger(__name__)

    def parse(self, speech: BundestagSpeech) -> BundestagMineDocument:
        """
        Parse content into a BundestagMineDocument object.

        Args:
            content: Raw response dict to be parsed

        Returns:
            Parsed document of type BundestagMineDocument
        """
        metadata = self._extract_metadata(speech)
        return BundestagMineDocument(text=speech.text, metadata=metadata)

    def _extract_metadata(self, speech: BundestagSpeech) -> dict:
        """
        Extract metadata from the response.

        Args:
            response: Raw response string

        Returns:
            Dictionary containing extracted metadata
        """
        legislature_period = speech.protocol.legislaturePeriod
        protocol_number = speech.protocol.number
        agenda_item_number = speech.agendaItem.agendaItemNumber

        url = f"https://dserver.bundestag.de/btp/{legislature_period}/{legislature_period}{protocol_number}.pdf"
        title = f"Protocol/Legislature/AgendaItem {protocol_number}/{legislature_period}/{agenda_item_number}"
        speaker_name = f"{speech.speaker.firstName} {speech.speaker.lastName}"

        return {
            "datasource": "bundestag",
            "language": "de",
            "url": url,
            "title": title,
            "format": "md",
            "created_time": speech.protocol.date,
            "last_edited_time": speech.protocol.date,
            "speaker_party": speech.speaker.party,
            "speaker": speaker_name,
            "agenda_item_number": agenda_item_number,
            "protocol_number": protocol_number,
            "legislature_period": legislature_period,
        }

parse(speech)

Parse content into a BundestagMineDocument object.

Parameters:
  • content

    Raw response dict to be parsed

Returns:
  • BundestagMineDocument

    Parsed document of type BundestagMineDocument

Source code in src/extraction/datasources/bundestag/parser.py
17
18
19
20
21
22
23
24
25
26
27
28
def parse(self, speech: BundestagSpeech) -> BundestagMineDocument:
    """
    Parse content into a BundestagMineDocument object.

    Args:
        content: Raw response dict to be parsed

    Returns:
        Parsed document of type BundestagMineDocument
    """
    metadata = self._extract_metadata(speech)
    return BundestagMineDocument(text=speech.text, metadata=metadata)

BundestagMineDatasourceParserFactory

Bases: Factory

Factory for creating instances of BundestagMineDatasourceParser.

Creates and configures BundestagMineDatasourceParser objects according to the provided configuration.

Source code in src/extraction/datasources/bundestag/parser.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class BundestagMineDatasourceParserFactory(Factory):
    """
    Factory for creating instances of BundestagMineDatasourceParser.

    Creates and configures BundestagMineDatasourceParser objects according to
    the provided configuration.
    """

    _configuration_class: Type = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BundestagMineDatasourceParser:
        """
        Create an instance of BundestagMineDatasourceParser.

        Args:
            configuration: Configuration for the parser (not used in this implementation)

        Returns:
            An instance of BundestagMineDatasourceParser
        """
        return BundestagMineDatasourceParser()

Reader

BundestagMineDatasourceReader

Bases: BaseReader

Reader for extracting speeches from the BundestagMine API.

Implements document extraction from the Bundestag speeches.

Source code in src/extraction/datasources/bundestag/reader.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class BundestagMineDatasourceReader(BaseReader):
    """Reader for extracting speeches from the BundestagMine API.

    Implements document extraction from the Bundestag speeches.
    """

    def __init__(
        self,
        configuration: BundestagMineDatasourceConfiguration,
        client: BundestagMineClient,
        logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
    ):
        """Initialize the BundestagMine reader.

        Args:
            configuration: Settings for BundestagMine access and export limits
            client: Client for BundestagMine API interactions
            logger: Logger instance for recording operation information
        """
        super().__init__()
        self.export_limit = configuration.export_limit
        self.client = client
        self.logger = logger

    async def read_all_async(
        self,
    ) -> AsyncIterator[dict]:
        """Asynchronously fetch all speeches from BundestagMine.

        Yields each speech as a dictionary containing its content and metadata.

        Returns:
            AsyncIterator[dict]: An async iterator of page dictionaries containing
            content and metadata such as text, speaker data, and last update information
        """
        self.logger.info(
            f"Reading speeches from BundestagMine with limit {self.export_limit}"
        )
        speech_iterator = self.client.fetch_all_speeches()
        yield_counter = 0

        for speech in speech_iterator:
            if self._limit_reached(yield_counter, self.export_limit):
                return

            self.logger.info(
                f"Fetched Bundestag speech {yield_counter}/{self.export_limit}."
            )
            yield_counter += 1
            yield speech

__init__(configuration, client, logger=LoggerConfiguration.get_logger(__name__))

Initialize the BundestagMine reader.

Parameters:
  • configuration (BundestagMineDatasourceConfiguration) –

    Settings for BundestagMine access and export limits

  • client (BundestagMineClient) –

    Client for BundestagMine API interactions

  • logger (Logger, default: get_logger(__name__) ) –

    Logger instance for recording operation information

Source code in src/extraction/datasources/bundestag/reader.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    configuration: BundestagMineDatasourceConfiguration,
    client: BundestagMineClient,
    logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
):
    """Initialize the BundestagMine reader.

    Args:
        configuration: Settings for BundestagMine access and export limits
        client: Client for BundestagMine API interactions
        logger: Logger instance for recording operation information
    """
    super().__init__()
    self.export_limit = configuration.export_limit
    self.client = client
    self.logger = logger

read_all_async() async

Asynchronously fetch all speeches from BundestagMine.

Yields each speech as a dictionary containing its content and metadata.

Returns:
  • AsyncIterator[dict]

    AsyncIterator[dict]: An async iterator of page dictionaries containing

  • AsyncIterator[dict]

    content and metadata such as text, speaker data, and last update information

Source code in src/extraction/datasources/bundestag/reader.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
async def read_all_async(
    self,
) -> AsyncIterator[dict]:
    """Asynchronously fetch all speeches from BundestagMine.

    Yields each speech as a dictionary containing its content and metadata.

    Returns:
        AsyncIterator[dict]: An async iterator of page dictionaries containing
        content and metadata such as text, speaker data, and last update information
    """
    self.logger.info(
        f"Reading speeches from BundestagMine with limit {self.export_limit}"
    )
    speech_iterator = self.client.fetch_all_speeches()
    yield_counter = 0

    for speech in speech_iterator:
        if self._limit_reached(yield_counter, self.export_limit):
            return

        self.logger.info(
            f"Fetched Bundestag speech {yield_counter}/{self.export_limit}."
        )
        yield_counter += 1
        yield speech

BundestagMineDatasourceReaderFactory

Bases: Factory

Factory for creating BundestagMine reader instances.

Creates and configures BundestagMineDatasourceReader objects with appropriate clients based on the provided configuration.

Source code in src/extraction/datasources/bundestag/reader.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class BundestagMineDatasourceReaderFactory(Factory):
    """Factory for creating BundestagMine reader instances.

    Creates and configures BundestagMineDatasourceReader objects with appropriate
    clients based on the provided configuration.
    """

    _configuration_class = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BundestagMineDatasourceReader:
        """Creates a configured BundestagMine reader instance.

        Initializes the BundestagMine client and reader with the given configuration
        settings for credentials, URL, and export limits.

        Args:
            configuration: BundestagMine connection and access settings

        Returns:
            BundestagMineDatasourceReader: Fully configured reader instance
        """
        client = BundestagMineClientFactory.create(configuration)
        return BundestagMineDatasourceReader(
            configuration=configuration,
            client=client,
        )