Bundestag Datasource

This module contains functionality related to the Bundestag datasource.

Client

BundestagMineClient

Bases: APIClient

API Client for the bundestag-mine.de API.

Source code in src/extraction/datasources/bundestag/client.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
class BundestagMineClient(APIClient):
    """
    API Client for the bundestag-mine.de API.
    """

    BASE_URL = "https://bundestag-mine.de/api/DashboardController"
    logger = LoggerConfiguration.get_logger(__name__)

    def safe_get(self, path: str) -> Optional[Any]:
        """
        Perform a GET request, raise for HTTP errors, parse JSON, check API status.

        Args:
            path: endpoint path under BASE_URL, e.g. "GetProtocols" or
                  "GetAgendaItemsOfProtocol/<protocol_id>"

        Returns:
            Dict[str, Any]: The 'result' field of the API response as a dict.

        Raises:
            ResponseParseError: if HTTP status is not OK or unexpected JSON structure.
        """
        url = f"{self.BASE_URL}/{path.lstrip('/')}"

        try:
            resp = self.get(url)
        except Exception as e:
            self.logger.warning(f"Failed to fetch speeches for {url}: {e}")
            return None

        try:
            resp.raise_for_status()
        except Exception as e:
            self.logger.error(f"HTTP error for {url}: {e}")
            return None

        data = resp.json()
        if not isinstance(data, dict) or data.get("status") != "200":
            self.logger.error(f"Unexpected response for {url}: {data}")
            return None

        result = data.get("result")
        if result is None:
            self.logger.debug(f"No result found for {url}")
            return None

        return result

    def get_protocols(self) -> Iterator[Protocol]:
        """
        Fetches the list of all protocols.

        Returns:
            Iterator[Protocol]: An iterator of valid protocols as Pydantic models.
        """
        result = self.safe_get("GetProtocols")
        if not isinstance(result, list):
            raise ResponseParseError(
                f"Expected list of protocols, got: {result}"
            )

        for protocol_data in result:
            try:
                yield Protocol.model_validate(protocol_data)
            except ValidationError as e:
                self.logger.warning(
                    f"Failed to validate protocol: {protocol_data}. Error: {e}"
                )

    def get_agenda_items(self, protocol_id: str) -> Iterator[AgendaItem]:
        """
        Fetches agenda items for a specific protocol ID.

        Args:
            protocol_id (str): The ID of the protocol.

        Returns:
            Iterator[AgendaItem]: An iterator of valid agenda items as Pydantic models.
        """
        result = self.safe_get(f"GetAgendaItemsOfProtocol/{protocol_id}")
        if result is None:
            self.logger.debug(f"No agenda items found for {protocol_id}")
            return

        items = result.get("agendaItems")

        if "items" == None:
            self.logger.debug(f"No agenda items found for {protocol_id}")
            return
        if not isinstance(items, list):
            self.logger.error(
                f"Expected list of agendaItems for {protocol_id}, got: {items}"
            )
            return

        for item_data in items:
            try:
                yield AgendaItem.model_validate(item_data)
            except ValidationError as e:
                self.logger.warning(
                    f"Failed to validate agenda item: {item_data}. Error: {e}"
                )

    def get_speaker_data(self, speaker_id: str) -> Optional[Speaker]:
        """
        Fetches speaker data for a specific speaker ID.

        Args:
            speaker_id (str): The ID of the speaker.

        Returns:
            Optional[Speaker]: Speaker data as a Pydantic model, or None if validation fails.
        """
        result = self.safe_get(f"GetSpeakerById/{speaker_id}")
        if not isinstance(result, dict):
            self.logger.error(
                f"Expected speaker data for {speaker_id}, got: {result}"
            )
            return None

        try:
            return Speaker.model_validate(result)
        except ValidationError as e:
            self.logger.warning(
                f"Failed to validate speaker data for {speaker_id}: {result}. Error: {e}"
            )
            return None

    @retry_request
    def get_speeches(
        self,
        protocol: Protocol,
        agenda_item: AgendaItem,
    ) -> Iterator[BundestagSpeech]:
        """
        Fetches speeches for a specific agenda item within a protocol.

        Args:
            legislature_period (int): The legislature period.
            protocol_number (int): The protocol number.
            agenda_item_number (str): The agenda item number.

        Returns:
            Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.
        """
        raw = f"{protocol.legislaturePeriod},{protocol.number},{agenda_item.agendaItemNumber}"
        encoded = quote(raw, safe="")
        result = self.safe_get(f"GetSpeechesOfAgendaItem/{encoded}")

        if result is None:
            self.logger.debug(f"No speeches found for {raw}")
            return

        speeches = result.get("speeches")
        if speeches is None:
            self.logger.debug(f"No speeches found for {raw}")
            return

        if not isinstance(speeches, list):
            self.logger.warning(
                f"Expected list of speeches for {raw}, got: {speeches}"
            )
            return

        for speech in speeches:
            try:
                speech = BundestagSpeech.model_validate(speech)
                speech.protocol = protocol
                speech.agendaItem = agenda_item
                yield speech
            except ValidationError as e:
                self.logger.warning(
                    f"Failed to validate speech: {speech}. Error: {e}"
                )

    def fetch_all_speeches(self) -> Iterator[BundestagSpeech]:
        """
        Fetches all speeches by iterating through protocols and their agenda items.

        Returns:
            Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.
        """
        for protocol in self.get_protocols():
            self.logger.info(f"Processing protocol {protocol.id}")

            for agenda_item in self.get_agenda_items(protocol.id):

                for speech in self.get_speeches(
                    protocol=protocol,
                    agenda_item=agenda_item,
                ):
                    speaker = self.get_speaker_data(speech.speakerId)
                    if speaker:
                        speech.speaker = speaker
                        yield speech

fetch_all_speeches()

Fetches all speeches by iterating through protocols and their agenda items.

Returns:
  • Iterator[BundestagSpeech]

    Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.

Source code in src/extraction/datasources/bundestag/client.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def fetch_all_speeches(self) -> Iterator[BundestagSpeech]:
    """
    Fetches all speeches by iterating through protocols and their agenda items.

    Returns:
        Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.
    """
    for protocol in self.get_protocols():
        self.logger.info(f"Processing protocol {protocol.id}")

        for agenda_item in self.get_agenda_items(protocol.id):

            for speech in self.get_speeches(
                protocol=protocol,
                agenda_item=agenda_item,
            ):
                speaker = self.get_speaker_data(speech.speakerId)
                if speaker:
                    speech.speaker = speaker
                    yield speech

get_agenda_items(protocol_id)

Fetches agenda items for a specific protocol ID.

Parameters:
  • protocol_id (str) –

    The ID of the protocol.

Returns:
  • Iterator[AgendaItem]

    Iterator[AgendaItem]: An iterator of valid agenda items as Pydantic models.

Source code in src/extraction/datasources/bundestag/client.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def get_agenda_items(self, protocol_id: str) -> Iterator[AgendaItem]:
    """
    Fetches agenda items for a specific protocol ID.

    Args:
        protocol_id (str): The ID of the protocol.

    Returns:
        Iterator[AgendaItem]: An iterator of valid agenda items as Pydantic models.
    """
    result = self.safe_get(f"GetAgendaItemsOfProtocol/{protocol_id}")
    if result is None:
        self.logger.debug(f"No agenda items found for {protocol_id}")
        return

    items = result.get("agendaItems")

    if "items" == None:
        self.logger.debug(f"No agenda items found for {protocol_id}")
        return
    if not isinstance(items, list):
        self.logger.error(
            f"Expected list of agendaItems for {protocol_id}, got: {items}"
        )
        return

    for item_data in items:
        try:
            yield AgendaItem.model_validate(item_data)
        except ValidationError as e:
            self.logger.warning(
                f"Failed to validate agenda item: {item_data}. Error: {e}"
            )

get_protocols()

Fetches the list of all protocols.

Returns:
  • Iterator[Protocol]

    Iterator[Protocol]: An iterator of valid protocols as Pydantic models.

Source code in src/extraction/datasources/bundestag/client.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def get_protocols(self) -> Iterator[Protocol]:
    """
    Fetches the list of all protocols.

    Returns:
        Iterator[Protocol]: An iterator of valid protocols as Pydantic models.
    """
    result = self.safe_get("GetProtocols")
    if not isinstance(result, list):
        raise ResponseParseError(
            f"Expected list of protocols, got: {result}"
        )

    for protocol_data in result:
        try:
            yield Protocol.model_validate(protocol_data)
        except ValidationError as e:
            self.logger.warning(
                f"Failed to validate protocol: {protocol_data}. Error: {e}"
            )

get_speaker_data(speaker_id)

Fetches speaker data for a specific speaker ID.

Parameters:
  • speaker_id (str) –

    The ID of the speaker.

Returns:
  • Optional[Speaker]

    Optional[Speaker]: Speaker data as a Pydantic model, or None if validation fails.

Source code in src/extraction/datasources/bundestag/client.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def get_speaker_data(self, speaker_id: str) -> Optional[Speaker]:
    """
    Fetches speaker data for a specific speaker ID.

    Args:
        speaker_id (str): The ID of the speaker.

    Returns:
        Optional[Speaker]: Speaker data as a Pydantic model, or None if validation fails.
    """
    result = self.safe_get(f"GetSpeakerById/{speaker_id}")
    if not isinstance(result, dict):
        self.logger.error(
            f"Expected speaker data for {speaker_id}, got: {result}"
        )
        return None

    try:
        return Speaker.model_validate(result)
    except ValidationError as e:
        self.logger.warning(
            f"Failed to validate speaker data for {speaker_id}: {result}. Error: {e}"
        )
        return None

get_speeches(protocol, agenda_item)

Fetches speeches for a specific agenda item within a protocol.

Parameters:
  • legislature_period (int) –

    The legislature period.

  • protocol_number (int) –

    The protocol number.

  • agenda_item_number (str) –

    The agenda item number.

Returns:
  • Iterator[BundestagSpeech]

    Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.

Source code in src/extraction/datasources/bundestag/client.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@retry_request
def get_speeches(
    self,
    protocol: Protocol,
    agenda_item: AgendaItem,
) -> Iterator[BundestagSpeech]:
    """
    Fetches speeches for a specific agenda item within a protocol.

    Args:
        legislature_period (int): The legislature period.
        protocol_number (int): The protocol number.
        agenda_item_number (str): The agenda item number.

    Returns:
        Iterator[BundestagSpeech]: An iterator of valid speeches as Pydantic models.
    """
    raw = f"{protocol.legislaturePeriod},{protocol.number},{agenda_item.agendaItemNumber}"
    encoded = quote(raw, safe="")
    result = self.safe_get(f"GetSpeechesOfAgendaItem/{encoded}")

    if result is None:
        self.logger.debug(f"No speeches found for {raw}")
        return

    speeches = result.get("speeches")
    if speeches is None:
        self.logger.debug(f"No speeches found for {raw}")
        return

    if not isinstance(speeches, list):
        self.logger.warning(
            f"Expected list of speeches for {raw}, got: {speeches}"
        )
        return

    for speech in speeches:
        try:
            speech = BundestagSpeech.model_validate(speech)
            speech.protocol = protocol
            speech.agendaItem = agenda_item
            yield speech
        except ValidationError as e:
            self.logger.warning(
                f"Failed to validate speech: {speech}. Error: {e}"
            )

safe_get(path)

Perform a GET request, raise for HTTP errors, parse JSON, check API status.

Parameters:
  • path (str) –

    endpoint path under BASE_URL, e.g. "GetProtocols" or "GetAgendaItemsOfProtocol/"

Returns:
  • Optional[Any]

    Dict[str, Any]: The 'result' field of the API response as a dict.

Raises:
  • ResponseParseError

    if HTTP status is not OK or unexpected JSON structure.

Source code in src/extraction/datasources/bundestag/client.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def safe_get(self, path: str) -> Optional[Any]:
    """
    Perform a GET request, raise for HTTP errors, parse JSON, check API status.

    Args:
        path: endpoint path under BASE_URL, e.g. "GetProtocols" or
              "GetAgendaItemsOfProtocol/<protocol_id>"

    Returns:
        Dict[str, Any]: The 'result' field of the API response as a dict.

    Raises:
        ResponseParseError: if HTTP status is not OK or unexpected JSON structure.
    """
    url = f"{self.BASE_URL}/{path.lstrip('/')}"

    try:
        resp = self.get(url)
    except Exception as e:
        self.logger.warning(f"Failed to fetch speeches for {url}: {e}")
        return None

    try:
        resp.raise_for_status()
    except Exception as e:
        self.logger.error(f"HTTP error for {url}: {e}")
        return None

    data = resp.json()
    if not isinstance(data, dict) or data.get("status") != "200":
        self.logger.error(f"Unexpected response for {url}: {data}")
        return None

    result = data.get("result")
    if result is None:
        self.logger.debug(f"No result found for {url}")
        return None

    return result

BundestagMineClientFactory

Bases: SingletonFactory

Factory for creating and managing Bundestag client instances.

This factory ensures only one Bundestag client is created per configuration, following the singleton pattern provided by the parent SingletonFactory class.

Source code in src/extraction/datasources/bundestag/client.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class BundestagMineClientFactory(SingletonFactory):
    """
    Factory for creating and managing Bundestag client instances.

    This factory ensures only one Bundestag client is created per configuration,
    following the singleton pattern provided by the parent SingletonFactory class.
    """

    _configuration_class: Type = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BundestagMineClient:
        """
        Creates a new BundestagMine client instance using the provided configuration.

        Args:
            configuration: Configuration object containing BundestagMine details

        Returns:
            A configured BundestagMine client instance ready for API interactions.
        """
        return BundestagMineClient()

BundestagSpeech

Bases: BaseModel

Represents a speech from BundestagMine API.

Source code in src/extraction/datasources/bundestag/client.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class BundestagSpeech(BaseModel):
    """Represents a speech from BundestagMine API."""

    id: str
    speakerId: str
    text: str
    speaker: Optional[Speaker] = None
    protocol: Optional[Protocol] = None
    agendaItem: Optional[AgendaItem] = None

    @model_validator(mode="after")
    def validate_text_not_empty(self) -> "BundestagSpeech":
        if not self.text or self.text.strip() == "":
            raise ValueError("BundestagSpeech text cannot be empty")
        return self

Client_dip

DIP API Client for Bundestag datasource using deutschland package.

This client provides access to the DIP (Dokumentations- und Informationssystem für Parlamentsmaterialien) API, which is the official German Bundestag API for parliamentary materials.

DIPClient

Client for DIP (Dokumentations- und Informationssystem) API.

Uses the deutschland Python package for API access, providing access to plenary protocols, printed materials, and proceedings.

Source code in src/extraction/datasources/bundestag/client_dip.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
class DIPClient:
    """
    Client for DIP (Dokumentations- und Informationssystem) API.

    Uses the deutschland Python package for API access, providing
    access to plenary protocols, printed materials, and proceedings.
    """

    DEFAULT_API_KEY = "OSOegLs.PR2lwJ1dwCeje9vTj7FPOt3hvpYKtwKkhw"

    def __init__(
        self,
        api_key: Optional[str] = None,
        wahlperiode: int = 21,
        fetch_sources: Optional[List[str]] = None,
        logger: Optional[logging.Logger] = None,
    ):
        """
        Initialize DIP client.

        Args:
            api_key: DIP API key. If not provided, uses public test key.
            wahlperiode: Electoral period number (default: 20).
            fetch_sources: List of data sources to fetch.
                Options: "protocols", "drucksachen", "proceedings"
            logger: Logger instance for logging.
        """
        self.api_key = api_key or self.DEFAULT_API_KEY
        self.wahlperiode = wahlperiode
        self.fetch_sources = fetch_sources or ["protocols", "drucksachen"]
        self.logger = logger or logging.getLogger(__name__)

        # Configure deutschland package
        self.configuration = dip_bundestag.Configuration(
            host="https://search.dip.bundestag.de/api/v1"
        )
        # Use query parameter auth (more reliable)
        self.configuration.api_key["ApiKeyQuery"] = self.api_key

        self.logger.info(
            f"Initialized DIP client for Wahlperiode {self.wahlperiode}, "
            f"sources: {self.fetch_sources}"
        )

    def fetch_all(self) -> Iterator[DIPDocument]:
        """
        Fetch all enabled data sources.

        Yields:
            DIPDocument instances wrapping different content types.
        """
        with dip_bundestag.ApiClient(self.configuration) as api_client:
            if "protocols" in self.fetch_sources:
                yield from self._fetch_protocols(api_client)

            if "drucksachen" in self.fetch_sources:
                yield from self._fetch_drucksachen(api_client)

            if "proceedings" in self.fetch_sources:
                yield from self._fetch_proceedings(api_client)

    def _fetch_protocols(self, api_client) -> Iterator[DIPDocument]:
        """
        Fetch plenary protocols with full text.

        Args:
            api_client: deutschland API client instance.

        Yields:
            DIPDocument instances for protocols.
        """
        self.logger.info(
            f"Fetching protocols for Wahlperiode {self.wahlperiode}"
        )

        protokoll_api = plenarprotokolle_api.PlenarprotokolleApi(api_client)

        try:
            # Get list of protocols - need to paginate to find Bundestag protocols
            # The API returns Bundesrat protocols first, so we need to paginate
            all_bt_protocols = []
            cursor = "*"
            max_pages = 10  # Limit pagination to avoid excessive API calls
            page_count = 0

            while cursor and page_count < max_pages:
                response = protokoll_api.get_plenarprotokoll_list(
                    f_wahlperiode=self.wahlperiode, format="json", cursor=cursor
                )

                # Filter for Bundestag protocols only (not Bundesrat)
                # herausgeber is a Zuordnung object, need to convert to string
                bt_protocols = [
                    p
                    for p in response.documents
                    if str(getattr(p, "herausgeber", "")) == "BT"
                ]
                all_bt_protocols.extend(bt_protocols)

                # Check if we have more results
                cursor = getattr(response, "cursor", None)
                page_count += 1

                self.logger.debug(
                    f"Page {page_count}: Found {len(bt_protocols)} BT protocols, "
                    f"total so far: {len(all_bt_protocols)}"
                )

                # Stop if we have enough
                if len(all_bt_protocols) >= 50:  # Reasonable limit for testing
                    break

            protocol_ids = [p.id for p in all_bt_protocols]

            self.logger.info(
                f"Found {len(protocol_ids)} Bundestag protocols "
                f"for Wahlperiode {self.wahlperiode} (across {page_count} pages)"
            )

            # Fetch full text for each protocol
            for protocol_id in protocol_ids:
                try:
                    # API expects integer ID
                    fulltext = protokoll_api.get_plenarprotokoll_text(
                        id=int(protocol_id), format="json"
                    )

                    # Only yield if we have text
                    if hasattr(fulltext, "text") and fulltext.text:
                        # Convert to dict for serialization
                        content_dict = fulltext.to_dict()

                        yield DIPDocument(
                            source_type="protocol", content=content_dict
                        )

                        dokumentnummer = getattr(
                            fulltext, "dokumentnummer", "unknown"
                        )
                        text_length = len(fulltext.text)
                        self.logger.debug(
                            f"Fetched protocol {dokumentnummer} "
                            f"({text_length} chars)"
                        )

                except Exception as e:
                    self.logger.warning(
                        f"Failed to fetch protocol {protocol_id}: {e}"
                    )
                    continue

        except Exception as e:
            self.logger.error(f"Failed to list protocols: {e}", exc_info=True)

    def _fetch_drucksachen(self, api_client) -> Iterator[DIPDocument]:
        """
        Fetch printed materials (drucksachen) with full text.

        Args:
            api_client: deutschland API client instance.

        Yields:
            DIPDocument instances for drucksachen.
        """
        self.logger.info(
            f"Fetching drucksachen for Wahlperiode {self.wahlperiode}"
        )

        drucksache_api = drucksachen_api.DrucksachenApi(api_client)

        try:
            cursor = None
            fetched_count = 0
            page = 1

            while True:
                # Fetch page of documents
                cursor_param = cursor if cursor else ""
                response = drucksache_api.get_drucksache_list(
                    f_wahlperiode=self.wahlperiode,
                    cursor=cursor_param,
                    format="json",
                )

                # Try to get full text for each document
                for doc_meta in response.documents:
                    try:
                        fulltext = drucksache_api.get_drucksache_text(
                            id=doc_meta.id, format="json"
                        )

                        # Only yield if we have text
                        if hasattr(fulltext, "text") and fulltext.text:
                            # Convert to dict for serialization
                            content_dict = fulltext.to_dict()

                            yield DIPDocument(
                                source_type="drucksache", content=content_dict
                            )

                            fetched_count += 1

                    except Exception:
                        # Many drucksachen don't have full text, log at debug level
                        self.logger.debug(
                            f"No full text for drucksache {doc_meta.id}"
                        )
                        continue

                # Log progress
                self.logger.info(
                    f"Drucksachen: page {page} complete, "
                    f"{fetched_count} with full text so far"
                )

                # Check pagination
                new_cursor = getattr(response, "cursor", None)
                if not new_cursor or new_cursor == cursor:
                    break

                cursor = new_cursor
                page += 1

            self.logger.info(
                f"Completed fetching drucksachen. "
                f"Total with full text: {fetched_count}"
            )

        except Exception as e:
            self.logger.error(
                f"Failed to fetch drucksachen: {e}", exc_info=True
            )

    def _fetch_proceedings(self, api_client) -> Iterator[DIPDocument]:
        """
        Fetch proceedings (vorgänge) - legislative processes.

        Args:
            api_client: deutschland API client instance.

        Yields:
            DIPDocument instances for proceedings.
        """
        self.logger.info(
            f"Fetching proceedings for Wahlperiode {self.wahlperiode}"
        )

        vorgang_api = vorgnge_api.VorgngeApi(api_client)

        try:
            cursor = None
            fetched_count = 0
            page = 1

            while True:
                # Fetch page of proceedings
                cursor_param = cursor if cursor else ""
                response = vorgang_api.get_vorgang_list(
                    f_wahlperiode=self.wahlperiode,
                    cursor=cursor_param,
                    format="json",
                )

                for vorgang in response.documents:
                    # Convert to dict for serialization
                    content_dict = vorgang.to_dict()

                    yield DIPDocument(
                        source_type="proceeding", content=content_dict
                    )

                    fetched_count += 1

                # Log progress
                self.logger.info(
                    f"Proceedings: page {page} complete, "
                    f"{fetched_count} total so far"
                )

                # Check pagination
                new_cursor = getattr(response, "cursor", None)
                if not new_cursor or new_cursor == cursor:
                    break

                cursor = new_cursor
                page += 1

            self.logger.info(
                f"Completed fetching proceedings. " f"Total: {fetched_count}"
            )

        except Exception as e:
            self.logger.error(
                f"Failed to fetch proceedings: {e}", exc_info=True
            )

__init__(api_key=None, wahlperiode=21, fetch_sources=None, logger=None)

Initialize DIP client.

Parameters:
  • api_key (Optional[str], default: None ) –

    DIP API key. If not provided, uses public test key.

  • wahlperiode (int, default: 21 ) –

    Electoral period number (default: 20).

  • fetch_sources (Optional[List[str]], default: None ) –

    List of data sources to fetch. Options: "protocols", "drucksachen", "proceedings"

  • logger (Optional[Logger], default: None ) –

    Logger instance for logging.

Source code in src/extraction/datasources/bundestag/client_dip.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __init__(
    self,
    api_key: Optional[str] = None,
    wahlperiode: int = 21,
    fetch_sources: Optional[List[str]] = None,
    logger: Optional[logging.Logger] = None,
):
    """
    Initialize DIP client.

    Args:
        api_key: DIP API key. If not provided, uses public test key.
        wahlperiode: Electoral period number (default: 20).
        fetch_sources: List of data sources to fetch.
            Options: "protocols", "drucksachen", "proceedings"
        logger: Logger instance for logging.
    """
    self.api_key = api_key or self.DEFAULT_API_KEY
    self.wahlperiode = wahlperiode
    self.fetch_sources = fetch_sources or ["protocols", "drucksachen"]
    self.logger = logger or logging.getLogger(__name__)

    # Configure deutschland package
    self.configuration = dip_bundestag.Configuration(
        host="https://search.dip.bundestag.de/api/v1"
    )
    # Use query parameter auth (more reliable)
    self.configuration.api_key["ApiKeyQuery"] = self.api_key

    self.logger.info(
        f"Initialized DIP client for Wahlperiode {self.wahlperiode}, "
        f"sources: {self.fetch_sources}"
    )

fetch_all()

Fetch all enabled data sources.

Yields:
  • DIPDocument

    DIPDocument instances wrapping different content types.

Source code in src/extraction/datasources/bundestag/client_dip.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def fetch_all(self) -> Iterator[DIPDocument]:
    """
    Fetch all enabled data sources.

    Yields:
        DIPDocument instances wrapping different content types.
    """
    with dip_bundestag.ApiClient(self.configuration) as api_client:
        if "protocols" in self.fetch_sources:
            yield from self._fetch_protocols(api_client)

        if "drucksachen" in self.fetch_sources:
            yield from self._fetch_drucksachen(api_client)

        if "proceedings" in self.fetch_sources:
            yield from self._fetch_proceedings(api_client)

DIPDocument

Bases: BaseModel

Unified model for all DIP data types.

This wrapper provides a consistent interface for different document types from the DIP API (protocols, drucksachen, proceedings).

Source code in src/extraction/datasources/bundestag/client_dip.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class DIPDocument(BaseModel):
    """
    Unified model for all DIP data types.

    This wrapper provides a consistent interface for different
    document types from the DIP API (protocols, drucksachen, proceedings).
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    source_type: str  # "protocol", "proceeding", "drucksache"
    content: Dict[str, Any]  # Raw content from deutschland package

    @property
    def text(self) -> str:
        """Extract text based on source type."""
        if self.source_type in ["protocol", "drucksache"]:
            return self.content.get("text", "")
        elif self.source_type == "proceeding":
            # For proceedings, use abstract as text
            return self.content.get("abstract", "")
        return ""

text property

Extract text based on source type.

Configuration

BundestagMineDatasourceConfiguration

Bases: DatasourceConfiguration

Configuration for the Bundestag datasource.

Supports multiple data sources: - BundestagMine API: Individual speeches from parliamentary sessions - DIP API: Comprehensive parliamentary documents (protocols, drucksachen, proceedings)

Source code in src/extraction/datasources/bundestag/configuration.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class BundestagMineDatasourceConfiguration(DatasourceConfiguration):
    """
    Configuration for the Bundestag datasource.

    Supports multiple data sources:
    - BundestagMine API: Individual speeches from parliamentary sessions
    - DIP API: Comprehensive parliamentary documents (protocols, drucksachen, proceedings)
    """

    name: Literal[DatasourceName.BUNDESTAG] = Field(
        ...,
        description="Identifier specifying this configuration is for the Bundestag datasource",
    )

    # BundestagMine settings
    include_bundestag_mine: bool = Field(
        default=True,
        description="Fetch speeches from BundestagMine API (bundestag-mine.de)",
    )

    # DIP API settings
    include_dip: bool = Field(
        default=True,
        description="Fetch data from DIP (Dokumentations- und Informationssystem) API",
    )

    dip_api_key: Optional[str] = Field(
        default=None,
        description="API key for DIP API. If not provided, uses public test key.",
    )

    dip_wahlperiode: int = Field(
        default=21,
        description="Electoral period (Wahlperiode) for DIP data",
    )

    dip_sources: List[str] = Field(
        default_factory=lambda: ["protocols"],
        description=(
            "DIP data sources to fetch. "
            "Options: 'protocols' (plenary transcripts), "
            "'drucksachen' (printed materials), "
            "'proceedings' (legislative processes)"
        ),
    )

Document

BundestagMineDocument

Bases: BaseDocument

Represents a document from the Bundestag datasource.

Supports multiple data sources: - BundestagMine: Individual speeches with speaker information - DIP API: Comprehensive parliamentary documents (protocols, drucksachen, proceedings)

Inherits from BaseDocument and includes additional metadata specific to Bundestag documents.

Source code in src/extraction/datasources/bundestag/document.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class BundestagMineDocument(BaseDocument):
    """
    Represents a document from the Bundestag datasource.

    Supports multiple data sources:
    - BundestagMine: Individual speeches with speaker information
    - DIP API: Comprehensive parliamentary documents (protocols, drucksachen, proceedings)

    Inherits from BaseDocument and includes additional metadata specific to Bundestag documents.
    """

    included_embed_metadata_keys: List[str] = [
        "title",
        "created_time",
        "last_edited_time",
        "speaker_party",
        "speaker",
        "protocol_number",
        "legislature_period",
        "document_type",
        "document_number",
        "document_subtype",
        "agenda_item_number",
        "source_client",
        "publisher",
        "document_art",
        "document_id",
        "parliamentary_composition",  # NEW: Party/fraction composition metadata
    ]

    included_llm_metadata_keys: List[str] = [
        "title",
        "created_time",
        "last_edited_time",
        "speaker_party",
        "speaker",
        "protocol_number",
        "legislature_period",
        "document_type",
        "document_number",
        "document_subtype",
        "agenda_item_number",
        "source_client",
        "publisher",
        "document_art",
        "document_id",
        "distribution_date",
        "xml_url",
        "related_proceedings_count",
        "parliamentary_composition",  # NEW: Party/fraction composition metadata
    ]

Manager

BundestagMineDatasourceManagerFactory

Bases: Factory

Factory for creating datasource managers.

Provides type-safe creation of datasource managers based on configuration.

Attributes:
  • _configuration_class (Type) –

    Type of configuration object

Source code in src/extraction/datasources/bundestag/manager.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class BundestagMineDatasourceManagerFactory(Factory):
    """Factory for creating datasource managers.

    Provides type-safe creation of datasource managers based on configuration.

    Attributes:
        _configuration_class: Type of configuration object
    """

    _configuration_class: Type = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BasicDatasourceManager:
        """Create an instance of the BundestagMine datasource manager.

        This method constructs a BasicDatasourceManager by creating the appropriate
        reader and parser based on the provided configuration.

        Args:
            configuration: Configuration specifying how to set up the BundestagMine datasource
                          manager, reader, and parser.

        Returns:
            A configured BasicDatasourceManager instance for handling BundestagMine documents.
        """
        reader = BundestagMineDatasourceReaderFactory.create(configuration)
        parser = BundestagMineDatasourceParserFactory.create(configuration)
        return BasicDatasourceManager(
            configuration=configuration,
            reader=reader,
            parser=parser,
        )

Parser

BundestagMineDatasourceParser

Bases: BaseParser[BundestagMineDocument]

Parser for Bundestag data from multiple sources.

Handles parsing of: - BundestagMine speeches (individual speech data) - DIP documents (protocols, drucksachen, proceedings)

Source code in src/extraction/datasources/bundestag/parser.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
class BundestagMineDatasourceParser(BaseParser[BundestagMineDocument]):
    """Parser for Bundestag data from multiple sources.

    Handles parsing of:
    - BundestagMine speeches (individual speech data)
    - DIP documents (protocols, drucksachen, proceedings)
    """

    logger = LoggerConfiguration.get_logger(__name__)

    def parse(
        self, content: Union[BundestagSpeech, DIPDocument]
    ) -> BundestagMineDocument:
        """
        Parse content into a BundestagMineDocument object.

        Args:
            content: Raw content to be parsed (BundestagSpeech or DIPDocument)

        Returns:
            Parsed document of type BundestagMineDocument
        """
        if isinstance(content, BundestagSpeech):
            return self._parse_bundestag_speech(content)
        elif isinstance(content, DIPDocument):
            return self._parse_dip_document(content)
        else:
            raise ValueError(f"Unsupported content type: {type(content)}")

    def _parse_bundestag_speech(
        self, speech: BundestagSpeech
    ) -> BundestagMineDocument:
        """Parse a BundestagMine speech into a document.

        Args:
            speech: BundestagSpeech object

        Returns:
            BundestagMineDocument with speech data
        """
        metadata = self._extract_metadata_from_speech(speech)
        return BundestagMineDocument(text=speech.text, metadata=metadata)

    def _parse_dip_document(
        self, dip_doc: DIPDocument
    ) -> BundestagMineDocument:
        """Parse a DIP document into a BundestagMineDocument.

        Args:
            dip_doc: DIPDocument object

        Returns:
            BundestagMineDocument with DIP data
        """
        metadata = self._extract_metadata_from_dip(dip_doc)

        # Filter protocol text to remove non-informative sections
        text = dip_doc.text
        if dip_doc.source_type == "protocol":
            text = self._filter_protocol_text(text)

        return BundestagMineDocument(text=text, metadata=metadata)

    def _filter_protocol_text(self, text: str) -> str:
        """Filter protocol text to remove non-informative sections.

        Removes:
        - Anlage sections (attachments with attendance lists, voting records)
        - Content after "Anlagen zum Stenografischen Bericht" marker
        - Name list sections (lines with >80% proper nouns, no verbs)

        Args:
            text: Raw protocol text

        Returns:
            Filtered text with only substantive content
        """
        lines = text.split("\n")
        filtered_lines = []
        in_anlage_section = False
        in_name_list = False
        removed_lines = 0
        name_list_start = -1

        for i, line in enumerate(lines):
            stripped = line.strip()

            # Check for start of Anlagen section (usually near the end)
            if stripped.startswith("Anlagen zum Stenografischen Bericht"):
                self.logger.debug(
                    f"Found 'Anlagen zum Stenografischen Bericht' at line {i}, "
                    f"removing remaining {len(lines) - i} lines"
                )
                removed_lines += len(lines) - i
                break

            # Check for individual Anlage markers followed by minimal content
            if stripped.startswith("Anlage"):
                # Look ahead to see if this is a low-content section
                next_lines = lines[i + 1 : min(i + 20, len(lines))]
                non_empty_next = [l.strip() for l in next_lines if l.strip()]

                # If Anlage is followed by very few words or just names/numbers,
                # it's likely an attachment section
                if len(non_empty_next) <= 3:
                    in_anlage_section = True
                    self.logger.debug(
                        f"Found standalone Anlage at line {i}: {stripped[:50]}"
                    )
                    removed_lines += 1
                    continue
                # Check if followed by attendance/voting list markers
                elif any(
                    marker in " ".join(non_empty_next[:5])
                    for marker in [
                        "Entschuldigte Abgeordnete",
                        "Namensverzeichnis",
                        "Ergebnis und Namensverzeichnis",
                    ]
                ):
                    in_anlage_section = True
                    self.logger.debug(
                        f"Found Anlage with attendance/voting list at line {i}"
                    )
                    removed_lines += 1
                    continue

            # Exit Anlage section when we hit substantive content
            if in_anlage_section:
                # Check for speaker pattern (name followed by colon)
                if (
                    ":" in line
                    and len(stripped) > 10
                    and stripped.endswith(":")
                ):
                    in_anlage_section = False
                    self.logger.debug(f"Exiting Anlage section at line {i}")
                # Check for long paragraph (likely substantive content)
                elif len(stripped) > 100:
                    in_anlage_section = False
                    self.logger.debug(f"Exiting Anlage section at line {i}")
                else:
                    removed_lines += 1
                    continue

            # NEW: Detect name list sections (lines with mostly proper nouns, no verbs)
            if not in_name_list and self._is_name_list_line(stripped):
                # Look ahead to see if this is start of a name list section
                next_lines = lines[i + 1 : min(i + 10, len(lines))]
                name_count = sum(
                    1
                    for l in next_lines
                    if l.strip() and self._is_name_list_line(l.strip())
                )

                # If 5+ consecutive name-like lines, it's a name list section
                if name_count >= 5:
                    in_name_list = True
                    name_list_start = i
                    removed_lines += 1
                    self.logger.debug(f"Entering name list section at line {i}")
                    continue

            # Exit name list when we hit substantive content
            if in_name_list:
                # Check for speaker pattern or long substantive text
                if (
                    (":" in line and stripped.endswith(":"))
                    or len(stripped) > 150
                    or self._has_verbs(stripped)
                ):
                    in_name_list = False
                    list_length = i - name_list_start
                    self.logger.debug(
                        f"Exited name list section at line {i} (removed {list_length} lines)"
                    )
                else:
                    removed_lines += 1
                    continue

            filtered_lines.append(line)

        filtered_text = "\n".join(filtered_lines)

        if removed_lines > 0:
            self.logger.info(
                f"Filtered protocol: removed {removed_lines} lines "
                f"({len(text) - len(filtered_text)} chars) of non-substantive content"
            )

        return filtered_text

    def _is_name_list_line(self, line: str) -> bool:
        """Check if a line looks like a name list entry.

        Name list characteristics:
        - Short line (< 80 chars)
        - 2-4 words
        - All words start with capital letter
        - No verbs or sentence structure

        Args:
            line: Line to check

        Returns:
            True if line looks like a name entry
        """
        if not line or len(line) > 80:
            return False

        words = line.split()
        if not (2 <= len(words) <= 5):
            return False

        # Check if all words are capitalized (typical for names)
        # Allow for common German particles: von, van, de, der
        particles = {"von", "van", "de", "der", "den", "zu"}
        capitalized_count = sum(
            1 for w in words if w[0].isupper() or w.lower() in particles
        )

        return (
            capitalized_count >= len(words) - 1
        )  # Allow one non-capitalized word

    def _has_verbs(self, text: str) -> bool:
        """Check if text contains common German verbs (indicates substantive content).

        Args:
            text: Text to check

        Returns:
            True if text contains verbs
        """
        # Common German verbs and verb patterns
        verb_indicators = [
            " ist ",
            " sind ",
            " war ",
            " waren ",
            " hat ",
            " haben ",
            " hatte ",
            " wird ",
            " werden ",
            " wurde ",
            " wurden ",
            " kann ",
            " können ",
            " soll ",
            " muss ",
            " möchte ",
            " sage ",
            " sagen ",
            " glaube ",
            " denke ",
            " meine ",
            " macht ",
            " machen ",
            " gibt ",
            " geht ",
        ]

        text_lower = f" {text.lower()} "
        return any(verb in text_lower for verb in verb_indicators)

    def _extract_metadata_from_speech(self, speech: BundestagSpeech) -> dict:
        """
        Extract metadata from a BundestagMine speech.

        Args:
            speech: BundestagSpeech object

        Returns:
            Dictionary containing extracted metadata
        """
        legislature_period = speech.protocol.legislaturePeriod
        protocol_number = speech.protocol.number
        agenda_item_number = speech.agendaItem.agendaItemNumber

        url = f"https://dserver.bundestag.de/btp/{legislature_period}/{legislature_period}{protocol_number}.pdf"
        title = f"Protocol/Legislature/AgendaItem {protocol_number}/{legislature_period}/{agenda_item_number}"
        speaker_name = f"{speech.speaker.firstName} {speech.speaker.lastName}"

        metadata = {
            "datasource": "bundestag",
            "source_client": "bundestag_mine",
            "language": "de",
            "url": url,
            "title": title,
            "format": "md",
            "created_time": speech.protocol.date,
            "last_edited_time": speech.protocol.date,
            "speaker_party": speech.speaker.party,
            "speaker": speaker_name,
            "agenda_item_number": agenda_item_number,
            "protocol_number": protocol_number,
            "document_number": f"{legislature_period}/{protocol_number}",  # Standardized identifier
            "legislature_period": legislature_period,
            "document_type": "speech",
        }

        # Extract party composition from speaker metadata
        if speech.speaker and speech.speaker.party:
            parliamentary_composition = (
                PartyExtractor.extract_from_speaker_party(speech.speaker.party)
            )
            metadata["parliamentary_composition"] = parliamentary_composition

        return metadata

    def _extract_metadata_from_dip(self, dip_doc: DIPDocument) -> dict:
        """
        Extract metadata from a DIP document.

        Args:
            dip_doc: DIPDocument object

        Returns:
            Dictionary containing extracted metadata
        """
        content = dip_doc.content
        source_type = dip_doc.source_type

        # Extract common fields present in all DIP documents
        metadata = {
            "datasource": "bundestag",
            "source_client": "dip",
            "language": "de",
            "format": "md",
            "document_type": source_type,
        }

        # Extract common metadata across all document types
        fundstelle = content.get("fundstelle", {})

        # Add document ID if available
        if "id" in content:
            metadata["document_id"] = str(content["id"])

        # Add publisher (herausgeber) if available
        if "herausgeber" in content:
            metadata["publisher"] = str(content["herausgeber"])

        # Add document art (dokumentart) if available
        if "dokumentart" in content:
            metadata["document_art"] = content["dokumentart"]

        # Extract type-specific metadata
        if source_type == "protocol":
            # Plenary protocol metadata
            dokumentnummer = content.get("dokumentnummer", "unknown")
            wahlperiode = content.get("wahlperiode", "")
            titel = content.get("titel", f"Plenary Protocol {dokumentnummer}")

            metadata.update(
                {
                    "title": titel,
                    "protocol_number": dokumentnummer,
                    "document_number": dokumentnummer,  # Standardized identifier (same as protocol_number for protocols)
                    "legislature_period": wahlperiode,
                    "url": fundstelle.get("pdf_url", ""),
                    "created_time": content.get("datum", ""),
                    "last_edited_time": content.get(
                        "aktualisiert", content.get("datum", "")
                    ),
                }
            )

            # Add fundstelle reference metadata
            if "verteildatum" in fundstelle:
                metadata["distribution_date"] = fundstelle["verteildatum"]
            if "xml_url" in fundstelle:
                metadata["xml_url"] = fundstelle["xml_url"]

            # Add vorgangsbezug (proceedings references) count
            if "vorgangsbezug_anzahl" in content:
                metadata["related_proceedings_count"] = content[
                    "vorgangsbezug_anzahl"
                ]

            # Extract parliamentary composition from protocol text
            text = content.get("text", "")
            parliamentary_composition = (
                PartyExtractor.extract_from_protocol_text(text)
            )
            metadata["parliamentary_composition"] = parliamentary_composition

            # Log extraction results
            num_fractions = len(parliamentary_composition.get("fractions", []))
            confidence = parliamentary_composition.get("confidence", "unknown")
            self.logger.info(
                f"Extracted {num_fractions} fractions from protocol {dokumentnummer} "
                f"(confidence: {confidence})"
            )

        elif source_type == "drucksache":
            # Printed material metadata
            dokumentnummer = content.get("dokumentnummer", "unknown")
            wahlperiode = content.get("wahlperiode", "")
            drucksachetyp = content.get("drucksachetyp", "")

            metadata.update(
                {
                    "title": f"Drucksache {dokumentnummer}",
                    "document_number": dokumentnummer,
                    "document_subtype": drucksachetyp,
                    "legislature_period": wahlperiode,
                    "url": fundstelle.get("pdf_url", ""),
                    "created_time": content.get("datum", ""),
                    "last_edited_time": content.get(
                        "aktualisiert", content.get("datum", "")
                    ),
                }
            )

            # Add fundstelle reference metadata
            if "verteildatum" in fundstelle:
                metadata["distribution_date"] = fundstelle["verteildatum"]
            if "xml_url" in fundstelle:
                metadata["xml_url"] = fundstelle["xml_url"]

        elif source_type == "proceeding":
            # Legislative proceeding metadata
            titel = content.get("titel", "")
            vorgangsnummer = content.get("vorgangsnummer", "unknown")
            wahlperiode = content.get("wahlperiode", "")

            metadata.update(
                {
                    "title": titel or f"Proceeding {vorgangsnummer}",
                    "document_number": vorgangsnummer,
                    "legislature_period": wahlperiode,
                    "url": fundstelle.get("url", ""),
                    "created_time": content.get("datum", ""),
                    "last_edited_time": content.get(
                        "aktualisiert", content.get("datum", "")
                    ),
                }
            )

        return metadata

parse(content)

Parse content into a BundestagMineDocument object.

Parameters:
  • content (Union[BundestagSpeech, DIPDocument]) –

    Raw content to be parsed (BundestagSpeech or DIPDocument)

Returns:
  • BundestagMineDocument

    Parsed document of type BundestagMineDocument

Source code in src/extraction/datasources/bundestag/parser.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def parse(
    self, content: Union[BundestagSpeech, DIPDocument]
) -> BundestagMineDocument:
    """
    Parse content into a BundestagMineDocument object.

    Args:
        content: Raw content to be parsed (BundestagSpeech or DIPDocument)

    Returns:
        Parsed document of type BundestagMineDocument
    """
    if isinstance(content, BundestagSpeech):
        return self._parse_bundestag_speech(content)
    elif isinstance(content, DIPDocument):
        return self._parse_dip_document(content)
    else:
        raise ValueError(f"Unsupported content type: {type(content)}")

BundestagMineDatasourceParserFactory

Bases: Factory

Factory for creating instances of BundestagMineDatasourceParser.

Creates and configures BundestagMineDatasourceParser objects according to the provided configuration.

Source code in src/extraction/datasources/bundestag/parser.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
class BundestagMineDatasourceParserFactory(Factory):
    """
    Factory for creating instances of BundestagMineDatasourceParser.

    Creates and configures BundestagMineDatasourceParser objects according to
    the provided configuration.
    """

    _configuration_class: Type = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BundestagMineDatasourceParser:
        """
        Create an instance of BundestagMineDatasourceParser.

        Args:
            configuration: Configuration for the parser (not used in this implementation)

        Returns:
            An instance of BundestagMineDatasourceParser
        """
        return BundestagMineDatasourceParser()

Party_extractor

Extract parliamentary composition metadata from Bundestag documents.

This module uses DYNAMIC extraction without hardcoded party names, making it future-proof for new parties and changing compositions.

PartyExtractor

Extracts party/fraction composition dynamically from protocol text.

Design principle: Extract ALL party mentions using pattern matching and heuristics, without hardcoding specific party names.

Source code in src/extraction/datasources/bundestag/party_extractor.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class PartyExtractor:
    """Extracts party/fraction composition dynamically from protocol text.

    Design principle: Extract ALL party mentions using pattern matching
    and heuristics, without hardcoding specific party names.
    """

    logger = LoggerConfiguration.get_logger(__name__)

    # Non-party keywords to filter out (roles, locations, organizations, etc.)
    NON_PARTY_KEYWORDS = {
        # Governmental roles
        "Bundeskanzler",
        "Bundeskanzlerin",
        "Bundesminister",
        "Bundesministerin",
        "Bundespräsident",
        "Bundespräsidentin",
        "Präsident",
        "Präsidentin",
        "Staatsminister",
        "Staatsministerin",
        "Staatssekretär",
        "Staatssekretärin",
        # Locations
        "Berlin",
        "Bonn",
        # Status
        "parteilos",
        "fraktionslos",
        "Gast",  # Guest speakers
        # Common organizational abbreviations (NOT parties)
        "EU",
        "UN",
        "NATO",
        "OSZE",
        "WHO",
        "IWF",
        "EZB",
        "BMWE",
        "BMI",
        "BMF",
        "BMAS",
        "BMZ",
        "BMVI",
        "BMVg",  # German ministries
        "BT",
        "BR",  # Bundestag/Bundesrat abbreviations
        "USA",
        "UK",
        "FR",  # Countries
        # Procedural terms
        "TOP",
        "ZP",  # Tagesordnungspunkt, Zusatzpunkt
    }

    @classmethod
    def extract_from_protocol_text(cls, text: str) -> Dict:
        """Extract party composition from DIP protocol text dynamically.

        Uses pattern matching and heuristics to identify parties
        WITHOUT hardcoded party names.

        Args:
            text: Full protocol text from DIP API

        Returns:
            Parliamentary composition metadata dictionary
        """
        if not text:
            return cls._empty_result()

        # Pattern: Matches "Name (PARTY)" speaker attributions
        # More flexible pattern that matches various name formats:
        # - "Hans Müller (CDU/CSU)"
        # - "Dr. Maria Schmidt (SPD)"
        # - "Speaker1 (CDU)" (for tests)
        # Captures name and content in parentheses
        pattern = r"(\b[A-ZÄÖÜa-zäöüß][A-ZÄÖÜa-zäöüß0-9\.\s]*?)\s+\(([^)]+)\)"

        matches = re.findall(pattern, text)

        # Extract candidates: text in parentheses after names
        candidates = [match[1].strip() for match in matches]

        # Filter to likely parties using heuristics
        party_candidates = []
        for candidate in candidates:
            if cls._is_likely_party(candidate):
                party_candidates.append(candidate)

        if not party_candidates:
            cls.logger.debug("No party candidates found in protocol text")
            return cls._empty_result()

        # Count occurrences for confidence scoring
        party_counts = Counter(party_candidates)

        cls.logger.debug(
            f"Found {len(party_counts)} unique party variations "
            f"with {sum(party_counts.values())} total mentions"
        )

        # CRITICAL FILTER: Remove noise by requiring minimum mentions
        # Real parties appear throughout the protocol (many mentions)
        # Noise abbreviations (government agencies, technical terms) appear rarely (1 mention)
        # Threshold: At least 2 mentions to be considered a party
        # This filters out single-occurrence noise while catching real parties
        MIN_MENTIONS = 2
        filtered_party_counts = Counter(
            {
                name: count
                for name, count in party_counts.items()
                if count >= MIN_MENTIONS
            }
        )

        if not filtered_party_counts:
            cls.logger.debug(
                f"After filtering for min {MIN_MENTIONS} mentions, no parties remain"
            )
            return cls._empty_result()

        cls.logger.debug(
            f"After filtering for min {MIN_MENTIONS} mentions: "
            f"{len(filtered_party_counts)} candidates remain"
        )

        # Group related party names (e.g., CDU, CSU, CDU/CSU → CDU/CSU)
        party_groups = cls._group_related_parties(filtered_party_counts)

        cls.logger.info(f"Grouped into {len(party_groups)} distinct fractions")

        # Build fractions list
        fractions = []
        for primary_name, related_names in party_groups.items():
            total_mentions = sum(
                filtered_party_counts[name] for name in related_names
            )

            fractions.append(
                {
                    "name": primary_name,
                    "variations": sorted(list(related_names)),
                    "type": "fraction",
                    "mention_count": total_mentions,
                }
            )

        # Sort by mention count (most mentioned first)
        fractions.sort(key=lambda f: f["mention_count"], reverse=True)

        confidence = cls._calculate_confidence(fractions, filtered_party_counts)

        cls.logger.info(
            f"Extracted {len(fractions)} fractions with {confidence} confidence: "
            f"{', '.join(f['name'] for f in fractions)}"
        )

        return {
            "fractions": fractions,
            "extraction_source": "protocol_text",
            "extracted_at": datetime.utcnow().isoformat(),
            "confidence": confidence,
        }

    @classmethod
    def extract_from_speaker_party(cls, party: str) -> Dict:
        """Extract party metadata from a single speech's speaker.party field.

        Stores raw party name without normalization.

        Args:
            party: Party abbreviation from speaker metadata

        Returns:
            Single-party composition metadata
        """
        if not party:
            return cls._empty_result()

        # Store raw party name - NO hardcoded normalization
        fractions = [
            {
                "name": party,
                "variations": [party],
                "type": "fraction",
                "mention_count": 1,
            }
        ]

        return {
            "fractions": fractions,
            "extraction_source": "speaker_metadata",
            "extracted_at": datetime.utcnow().isoformat(),
            "confidence": "low",  # Single speech doesn't show full composition
        }

    @classmethod
    def _is_likely_party(cls, text: str) -> bool:
        """Determine if text is likely a party/fraction name using heuristics.

        CONSERVATIVE approach: Only match text that strongly resembles party names.

        Heuristics (NO hardcoded party names):
        1. Length between 2-20 characters (parties are concise)
        2. Not a common non-party phrase (roles, locations, organizations)
        3. Matches specific party-name patterns:
           a) 2-4 char ALL-CAPS abbreviations (SPD, AfD, CDU, CSU, FDP)
           b) Compound party names with "/" (CDU/CSU, BÜNDNIS 90/DIE GRÜNEN)
           c) Party names starting with "Die " followed by capitalized word
           d) Party names starting with "Bündnis" or "Bund"

        Args:
            text: Candidate text from parentheses

        Returns:
            True if text is likely a party name
        """
        text_clean = text.strip()

        # Must have content
        if not text_clean or len(text_clean) < 2:
            return False

        # STRICTER length check (parties are typically 2-25 chars)
        # Allow up to 25 to accommodate "BÜNDNIS 90/DIE GRÜNEN" (23 chars)
        if len(text_clean) > 25:
            return False

        # Exclude known non-party phrases FIRST
        if any(keyword in text_clean for keyword in cls.NON_PARTY_KEYWORDS):
            return False

        # Must contain at least one uppercase letter
        if not any(c.isupper() for c in text_clean):
            return False

        # Calculate character composition
        uppercase_count = sum(1 for c in text_clean if c.isupper())
        alpha_count = sum(1 for c in text_clean if c.isalpha())

        if alpha_count == 0:
            return False

        uppercase_ratio = uppercase_count / alpha_count

        # PATTERN 1: Short abbreviations (2-6 characters)
        # Examples: SPD, AfD, CDU, CSU, FDP, BSW, GRÜNE, LINKE
        if 2 <= len(text_clean) <= 6:
            # Must be all letters (no numbers) and at least 2 uppercase letters
            # This allows "AfD" (2 uppercase out of 3) but rejects "EU", "UK" (too international)
            if alpha_count == len(text_clean) and uppercase_count >= 2:
                return True
            return (
                False  # Reject short strings with numbers or too few uppercase
            )

        # PATTERN 2: Compound names with slash "/"
        # Examples: "CDU/CSU", "BÜNDNIS 90/DIE GRÜNEN"
        if "/" in text_clean:
            # Must have at least 50% uppercase
            if uppercase_ratio >= 0.5:
                # Additional check: both sides of "/" should have letters
                parts = text_clean.split("/")
                if len(parts) == 2 and all(
                    any(c.isalpha() for c in p) for p in parts
                ):
                    return True
            return False

        # PATTERN 3: Names starting with "Die " or "DIE "
        # Examples: "Die Linke", "DIE LINKE"
        if text_clean.startswith("Die ") or text_clean.startswith("DIE "):
            # After "Die ", should have at least one more capitalized word
            remaining = text_clean[4:].strip()
            if remaining and remaining[0].isupper():
                return True
            return False

        # PATTERN 4: Names starting with "Bündnis" or "Bund"
        # Examples: "BÜNDNIS 90", "Bündnis"
        if (
            text_clean.startswith("Bündnis")
            or text_clean.startswith("BÜNDNIS")
            or text_clean.startswith("Bund")
        ):
            return True

        # All other patterns rejected
        return False

    @classmethod
    def _group_related_parties(
        cls, party_counts: Counter
    ) -> Dict[str, Set[str]]:
        """Group related party name variations dynamically.

        Groups variations like:
        - "CDU", "CSU", "CDU/CSU" → "CDU/CSU" (longest/compound)
        - "GRÜNE", "DIE GRÜNEN", "BÜNDNIS 90/DIE GRÜNEN" → longest variant
        - "Die Linke", "DIE LINKE" → most common variant

        Strategy: Use Union-Find algorithm to merge related parties.

        Args:
            party_counts: Counter of party name occurrences

        Returns:
            Dict mapping canonical name to set of related names
        """
        parties = list(party_counts.keys())

        # Build Union-Find structure
        # parent[party] = canonical representative of its group
        parent = {p: p for p in parties}

        def find(party):
            """Find root of party's group."""
            if parent[party] != party:
                parent[party] = find(parent[party])  # Path compression
            return parent[party]

        def union(party1, party2):
            """Merge groups of party1 and party2."""
            root1 = find(party1)
            root2 = find(party2)
            if root1 != root2:
                # Merge: prefer party with slash, then longer, then more frequent
                if (
                    1 if "/" in root1 else 0,
                    len(root1),
                    party_counts[root1],
                ) >= (
                    1 if "/" in root2 else 0,
                    len(root2),
                    party_counts[root2],
                ):
                    parent[root2] = root1
                else:
                    parent[root1] = root2

        # Find all related pairs and union them
        for i, party1 in enumerate(parties):
            party1_upper = party1.upper()
            for party2 in parties[i + 1 :]:
                party2_upper = party2.upper()

                # Check if related
                is_substring = (
                    party1_upper in party2_upper or party2_upper in party1_upper
                )
                are_related = cls._are_related_parties(party1, party2)

                if is_substring or are_related:
                    union(party1, party2)

        # Build groups from Union-Find structure
        groups: Dict[str, Set[str]] = {}
        for party in parties:
            root = find(party)
            if root not in groups:
                groups[root] = set()
            groups[root].add(party)

        return groups

    @classmethod
    def _are_related_parties(cls, party1: str, party2: str) -> bool:
        """Check if two party names are related variations.

        Uses word overlap heuristic WITHOUT hardcoded party knowledge.

        Args:
            party1: First party name
            party2: Second party name

        Returns:
            True if parties are related
        """
        p1_upper = party1.upper()
        p2_upper = party2.upper()

        # Extract significant words (length >= 3, all caps)
        words1 = {w for w in re.findall(r"\b[A-ZÄÖÜ]{3,}\b", p1_upper)}
        words2 = {w for w in re.findall(r"\b[A-ZÄÖÜ]{3,}\b", p2_upper)}

        # Check for shared significant words
        # BUT: "DIE" and "LINKE" are both 3+ chars, so we need to be more careful
        # Only consider them related if they share a MEANINGFUL word
        shared_words = words1 & words2
        if shared_words:
            # Exclude common articles/connectors: DIE, DER, DAS, UND, VON
            meaningful_shared = shared_words - {
                "DIE",
                "DER",
                "DAS",
                "UND",
                "VON",
            }
            if meaningful_shared:
                return True

        return False

    @classmethod
    def _calculate_confidence(
        cls, fractions: List[Dict], party_counts: Counter
    ) -> str:
        """Calculate confidence level based on extraction results.

        Confidence based on:
        - Number of fractions (typical Bundestag has 4-7 fractions)
        - Total mention count (more mentions = higher confidence)

        Args:
            fractions: List of extracted fractions
            party_counts: Counter of raw party mentions

        Returns:
            "high", "medium", or "low"
        """
        num_fractions = len(fractions)
        total_mentions = sum(party_counts.values())

        # Typical Bundestag has 4-7 fractions
        # High confidence: 4+ distinct fractions (typical composition)
        # OR: 2+ fractions with many mentions (confirms representative sample)
        if num_fractions >= 4 or (num_fractions >= 2 and total_mentions >= 20):
            return "high"
        elif num_fractions >= 2 or total_mentions >= 10:
            return "medium"
        else:
            return "low"

    @classmethod
    def _empty_result(cls) -> Dict:
        """Return empty composition metadata for documents with no parties."""
        return {
            "fractions": [],
            "extraction_source": "none",
            "extracted_at": datetime.utcnow().isoformat(),
            "confidence": "low",
        }

extract_from_protocol_text(text) classmethod

Extract party composition from DIP protocol text dynamically.

Uses pattern matching and heuristics to identify parties WITHOUT hardcoded party names.

Parameters:
  • text (str) –

    Full protocol text from DIP API

Returns:
  • Dict

    Parliamentary composition metadata dictionary

Source code in src/extraction/datasources/bundestag/party_extractor.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
@classmethod
def extract_from_protocol_text(cls, text: str) -> Dict:
    """Extract party composition from DIP protocol text dynamically.

    Uses pattern matching and heuristics to identify parties
    WITHOUT hardcoded party names.

    Args:
        text: Full protocol text from DIP API

    Returns:
        Parliamentary composition metadata dictionary
    """
    if not text:
        return cls._empty_result()

    # Pattern: Matches "Name (PARTY)" speaker attributions
    # More flexible pattern that matches various name formats:
    # - "Hans Müller (CDU/CSU)"
    # - "Dr. Maria Schmidt (SPD)"
    # - "Speaker1 (CDU)" (for tests)
    # Captures name and content in parentheses
    pattern = r"(\b[A-ZÄÖÜa-zäöüß][A-ZÄÖÜa-zäöüß0-9\.\s]*?)\s+\(([^)]+)\)"

    matches = re.findall(pattern, text)

    # Extract candidates: text in parentheses after names
    candidates = [match[1].strip() for match in matches]

    # Filter to likely parties using heuristics
    party_candidates = []
    for candidate in candidates:
        if cls._is_likely_party(candidate):
            party_candidates.append(candidate)

    if not party_candidates:
        cls.logger.debug("No party candidates found in protocol text")
        return cls._empty_result()

    # Count occurrences for confidence scoring
    party_counts = Counter(party_candidates)

    cls.logger.debug(
        f"Found {len(party_counts)} unique party variations "
        f"with {sum(party_counts.values())} total mentions"
    )

    # CRITICAL FILTER: Remove noise by requiring minimum mentions
    # Real parties appear throughout the protocol (many mentions)
    # Noise abbreviations (government agencies, technical terms) appear rarely (1 mention)
    # Threshold: At least 2 mentions to be considered a party
    # This filters out single-occurrence noise while catching real parties
    MIN_MENTIONS = 2
    filtered_party_counts = Counter(
        {
            name: count
            for name, count in party_counts.items()
            if count >= MIN_MENTIONS
        }
    )

    if not filtered_party_counts:
        cls.logger.debug(
            f"After filtering for min {MIN_MENTIONS} mentions, no parties remain"
        )
        return cls._empty_result()

    cls.logger.debug(
        f"After filtering for min {MIN_MENTIONS} mentions: "
        f"{len(filtered_party_counts)} candidates remain"
    )

    # Group related party names (e.g., CDU, CSU, CDU/CSU → CDU/CSU)
    party_groups = cls._group_related_parties(filtered_party_counts)

    cls.logger.info(f"Grouped into {len(party_groups)} distinct fractions")

    # Build fractions list
    fractions = []
    for primary_name, related_names in party_groups.items():
        total_mentions = sum(
            filtered_party_counts[name] for name in related_names
        )

        fractions.append(
            {
                "name": primary_name,
                "variations": sorted(list(related_names)),
                "type": "fraction",
                "mention_count": total_mentions,
            }
        )

    # Sort by mention count (most mentioned first)
    fractions.sort(key=lambda f: f["mention_count"], reverse=True)

    confidence = cls._calculate_confidence(fractions, filtered_party_counts)

    cls.logger.info(
        f"Extracted {len(fractions)} fractions with {confidence} confidence: "
        f"{', '.join(f['name'] for f in fractions)}"
    )

    return {
        "fractions": fractions,
        "extraction_source": "protocol_text",
        "extracted_at": datetime.utcnow().isoformat(),
        "confidence": confidence,
    }

extract_from_speaker_party(party) classmethod

Extract party metadata from a single speech's speaker.party field.

Stores raw party name without normalization.

Parameters:
  • party (str) –

    Party abbreviation from speaker metadata

Returns:
  • Dict

    Single-party composition metadata

Source code in src/extraction/datasources/bundestag/party_extractor.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@classmethod
def extract_from_speaker_party(cls, party: str) -> Dict:
    """Extract party metadata from a single speech's speaker.party field.

    Stores raw party name without normalization.

    Args:
        party: Party abbreviation from speaker metadata

    Returns:
        Single-party composition metadata
    """
    if not party:
        return cls._empty_result()

    # Store raw party name - NO hardcoded normalization
    fractions = [
        {
            "name": party,
            "variations": [party],
            "type": "fraction",
            "mention_count": 1,
        }
    ]

    return {
        "fractions": fractions,
        "extraction_source": "speaker_metadata",
        "extracted_at": datetime.utcnow().isoformat(),
        "confidence": "low",  # Single speech doesn't show full composition
    }

Reader

BundestagMineDatasourceReader

Bases: BaseReader

Reader for extracting data from multiple Bundestag sources.

Supports multiple data sources: - BundestagMine API: Individual speeches from parliamentary sessions - DIP API: Comprehensive parliamentary documents (protocols, drucksachen, proceedings)

Source code in src/extraction/datasources/bundestag/reader.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class BundestagMineDatasourceReader(BaseReader):
    """Reader for extracting data from multiple Bundestag sources.

    Supports multiple data sources:
    - BundestagMine API: Individual speeches from parliamentary sessions
    - DIP API: Comprehensive parliamentary documents (protocols, drucksachen, proceedings)
    """

    def __init__(
        self,
        configuration: BundestagMineDatasourceConfiguration,
        client: Optional[BundestagMineClient] = None,
        dip_client: Optional[DIPClient] = None,
        logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
    ):
        """Initialize the Bundestag reader with multiple data sources.

        Args:
            configuration: Settings for Bundestag data access and export limits
            client: Client for BundestagMine API interactions (optional)
            dip_client: Client for DIP API interactions (optional)
            logger: Logger instance for recording operation information
        """
        super().__init__()
        self.configuration = configuration
        self.export_limit = configuration.export_limit
        self.client = client
        self.dip_client = dip_client
        self.logger = logger

    async def read_all_async(
        self,
    ) -> AsyncIterator[dict]:
        """Asynchronously fetch all documents from enabled Bundestag sources.

        Yields documents from multiple sources based on configuration:
        - BundestagMine: Individual speeches
        - DIP: Comprehensive parliamentary documents

        Each enabled source gets the full export_limit, so if both sources are
        enabled with export_limit=2, you get 2 documents from each source (4 total).

        Returns:
            AsyncIterator[dict]: An async iterator of document dictionaries containing
            content and metadata such as text, speaker data, and last update information
        """
        self.logger.info(
            f"Reading Bundestag documents with limit {self.export_limit} per source"
        )

        # Source 1: BundestagMine speeches
        if self.configuration.include_bundestag_mine and self.client:
            self.logger.info(
                f"Fetching speeches from BundestagMine API (limit: {self.export_limit})..."
            )
            speech_iterator = self.client.fetch_all_speeches()
            mine_counter = 0

            for speech in speech_iterator:
                if self._limit_reached(mine_counter, self.export_limit):
                    break

                self.logger.info(
                    f"Fetched BundestagMine speech {mine_counter + 1}/{self.export_limit}."
                )
                mine_counter += 1
                yield speech

        # Source 2: DIP API documents
        if self.configuration.include_dip and self.dip_client:
            self.logger.info(
                f"Fetching documents from DIP API (limit: {self.export_limit})..."
            )
            dip_iterator = self.dip_client.fetch_all()
            dip_counter = 0

            for dip_document in dip_iterator:
                if self._limit_reached(dip_counter, self.export_limit):
                    break

                self.logger.info(
                    f"Fetched DIP document {dip_counter + 1}/{self.export_limit}."
                )
                dip_counter += 1
                yield dip_document

__init__(configuration, client=None, dip_client=None, logger=LoggerConfiguration.get_logger(__name__))

Initialize the Bundestag reader with multiple data sources.

Parameters:
  • configuration (BundestagMineDatasourceConfiguration) –

    Settings for Bundestag data access and export limits

  • client (Optional[BundestagMineClient], default: None ) –

    Client for BundestagMine API interactions (optional)

  • dip_client (Optional[DIPClient], default: None ) –

    Client for DIP API interactions (optional)

  • logger (Logger, default: get_logger(__name__) ) –

    Logger instance for recording operation information

Source code in src/extraction/datasources/bundestag/reader.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    configuration: BundestagMineDatasourceConfiguration,
    client: Optional[BundestagMineClient] = None,
    dip_client: Optional[DIPClient] = None,
    logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
):
    """Initialize the Bundestag reader with multiple data sources.

    Args:
        configuration: Settings for Bundestag data access and export limits
        client: Client for BundestagMine API interactions (optional)
        dip_client: Client for DIP API interactions (optional)
        logger: Logger instance for recording operation information
    """
    super().__init__()
    self.configuration = configuration
    self.export_limit = configuration.export_limit
    self.client = client
    self.dip_client = dip_client
    self.logger = logger

read_all_async() async

Asynchronously fetch all documents from enabled Bundestag sources.

Yields documents from multiple sources based on configuration: - BundestagMine: Individual speeches - DIP: Comprehensive parliamentary documents

Each enabled source gets the full export_limit, so if both sources are enabled with export_limit=2, you get 2 documents from each source (4 total).

Returns:
  • AsyncIterator[dict]

    AsyncIterator[dict]: An async iterator of document dictionaries containing

  • AsyncIterator[dict]

    content and metadata such as text, speaker data, and last update information

Source code in src/extraction/datasources/bundestag/reader.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def read_all_async(
    self,
) -> AsyncIterator[dict]:
    """Asynchronously fetch all documents from enabled Bundestag sources.

    Yields documents from multiple sources based on configuration:
    - BundestagMine: Individual speeches
    - DIP: Comprehensive parliamentary documents

    Each enabled source gets the full export_limit, so if both sources are
    enabled with export_limit=2, you get 2 documents from each source (4 total).

    Returns:
        AsyncIterator[dict]: An async iterator of document dictionaries containing
        content and metadata such as text, speaker data, and last update information
    """
    self.logger.info(
        f"Reading Bundestag documents with limit {self.export_limit} per source"
    )

    # Source 1: BundestagMine speeches
    if self.configuration.include_bundestag_mine and self.client:
        self.logger.info(
            f"Fetching speeches from BundestagMine API (limit: {self.export_limit})..."
        )
        speech_iterator = self.client.fetch_all_speeches()
        mine_counter = 0

        for speech in speech_iterator:
            if self._limit_reached(mine_counter, self.export_limit):
                break

            self.logger.info(
                f"Fetched BundestagMine speech {mine_counter + 1}/{self.export_limit}."
            )
            mine_counter += 1
            yield speech

    # Source 2: DIP API documents
    if self.configuration.include_dip and self.dip_client:
        self.logger.info(
            f"Fetching documents from DIP API (limit: {self.export_limit})..."
        )
        dip_iterator = self.dip_client.fetch_all()
        dip_counter = 0

        for dip_document in dip_iterator:
            if self._limit_reached(dip_counter, self.export_limit):
                break

            self.logger.info(
                f"Fetched DIP document {dip_counter + 1}/{self.export_limit}."
            )
            dip_counter += 1
            yield dip_document

BundestagMineDatasourceReaderFactory

Bases: Factory

Factory for creating Bundestag reader instances.

Creates and configures BundestagMineDatasourceReader objects with appropriate clients based on the provided configuration. Supports multiple data sources including BundestagMine and DIP APIs.

Source code in src/extraction/datasources/bundestag/reader.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class BundestagMineDatasourceReaderFactory(Factory):
    """Factory for creating Bundestag reader instances.

    Creates and configures BundestagMineDatasourceReader objects with appropriate
    clients based on the provided configuration. Supports multiple data sources
    including BundestagMine and DIP APIs.
    """

    _configuration_class = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BundestagMineDatasourceReader:
        """Creates a configured Bundestag reader instance.

        Initializes the appropriate clients (BundestagMine and/or DIP) based on
        configuration settings.

        Args:
            configuration: Bundestag connection and access settings

        Returns:
            BundestagMineDatasourceReader: Fully configured reader instance
        """
        # Create BundestagMine client if enabled
        bundestag_mine_client = None
        if configuration.include_bundestag_mine:
            bundestag_mine_client = BundestagMineClientFactory.create(
                configuration
            )

        # Create DIP client if enabled
        dip_client = None
        if configuration.include_dip:
            dip_client = DIPClient(
                api_key=configuration.dip_api_key,
                wahlperiode=configuration.dip_wahlperiode,
                fetch_sources=configuration.dip_sources,
            )

        return BundestagMineDatasourceReader(
            configuration=configuration,
            client=bundestag_mine_client,
            dip_client=dip_client,
        )