Bundestag Datasource

This module contains functionality related to the Bundestag datasource.

Client

BundestagMineClient

Bases: APIClient

API Client for the bundestag-mine.de API.

Source code in src/extraction/datasources/bundestag/client.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class BundestagMineClient(APIClient):
    """
    API Client for the bundestag-mine.de API.
    """

    BASE_URL = "https://bundestag-mine.de/api/DashboardController"
    logger = LoggerConfiguration.get_logger(__name__)

    def safe_get(self, path: str) -> Dict[str, Any]:
        """
        Perform a GET request, raise for HTTP errors, parse JSON, check API status.

        Args:
            path: endpoint path under BASE_URL, e.g. "GetProtocols" or
                  "GetAgendaItemsOfProtocol/<protocol_id>"

        Returns:
            Dict[str, Any]: The 'result' field of the API response as a dict.

        Raises:
            ResponseParseError: if HTTP status is not OK or unexpected JSON structure.
        """
        url = f"{self.BASE_URL}/{path.lstrip('/')}"
        resp = self.get(url)
        try:
            resp.raise_for_status()
        except Exception as e:
            raise ResponseParseError(f"HTTP error for {url}: {e}")

        data = resp.json()
        if not isinstance(data, dict) or data.get("status") != "200":
            raise ResponseParseError(f"Unexpected response for {url}: {data}")

        result = data.get("result")
        if result is None:
            self.logger.debug(f"No result found for {url}")
            return {}
        return result

    def get_protocols(self) -> List[Dict[str, Any]]:
        """
        Fetches the list of all protocols.

        Returns:
            List[Dict[str, Any]]: A list of protocols.
        """
        result = self.safe_get("GetProtocols")
        if not isinstance(result, list):
            raise ResponseParseError(
                f"Expected list of protocols, got: {result}"
            )
        return result

    def get_agenda_items(self, protocol_id: str) -> List[Dict[str, Any]]:
        """
        Fetches agenda items for a specific protocol ID.

        Args:
            protocol_id (str): The ID of the protocol.

        Returns:
            List[Dict[str, Any]]: A list of agenda items.
        """
        result = self.safe_get(f"GetAgendaItemsOfProtocol/{protocol_id}")
        items = result.get("agendaItems")
        if items is None:
            self.logger.debug(f"No agenda items found for {protocol_id}")
            return []
        if not isinstance(items, list):
            raise ResponseParseError(
                f"Expected list of agendaItems for {protocol_id}, got: {items}"
            )
        return items

    def get_speaker_data(self, speaker_id: str) -> Dict[str, Any]:
        """ """
        result = self.safe_get(f"GetSpeakerById/{speaker_id}")
        if not isinstance(result, dict):
            raise ResponseParseError(f"Expected speaker data, got: {result}")
        return result

    @retry_request
    def get_speeches(
        self,
        legislature_period: int,
        protocol_number: int,
        agenda_item_number: str,
    ) -> List[Dict[str, Any]]:
        """
        Fetches speeches for a specific agenda item within a protocol.

        Args:
            legislature_period (int): The legislature period.
            protocol_number (int): The protocol number.
            agenda_item_number (str): The agenda item number.

        Returns:
            List[Dict[str, Any]]: A list of speeches.
        """
        raw = f"{legislature_period},{protocol_number},{agenda_item_number}"
        encoded = quote(raw, safe="")
        result = self.safe_get(f"GetSpeechesOfAgendaItem/{encoded}")
        speeches = result.get("speeches")
        if speeches is None:
            self.logger.debug(f"No speeches found for {raw}")
            return []
        if not isinstance(speeches, list):
            raise ResponseParseError(
                f"Expected list of speeches for {raw}, got: {speeches}"
            )

        # add speaker data
        for speech in speeches:
            speaker_id = speech.get("speakerId")
            if speaker_id is None:
                continue
            try:
                speaker_data = self.get_speaker_data(speaker_id)
                speech["speaker"] = speaker_data
            except (APIClientError, ResponseParseError):
                self.logger.debug(
                    f"Failed to get speaker data for {speaker_id}"
                )
                continue

        return speeches

    def fetch_all_speeches(self) -> Iterator[Dict[str, Any]]:
        """
        Fetches all speeches by iterating through protocols and their agenda items.

        Returns:
            List[Dict[str, Any]]: A list containing all speeches found.
        """
        try:
            protocols = self.get_protocols()
        except (APIClientError, ResponseParseError):
            self.logger.debug("Failed to get protocols")
            return []

        for prot in protocols:
            pid = prot.get("id")
            wp = prot.get("legislaturePeriod")
            num = prot.get("number")
            if not pid or wp is None or num is None:
                self.logger.debug(
                    f"Skipping protocol with missing data: {prot}. "
                )
                continue  # skip incomplete entries

            # normalize types
            try:
                wp_int = int(wp)
                num_int = int(num)
            except (ValueError, TypeError):
                continue

            # get agenda items
            try:
                items = self.get_agenda_items(pid)
            except (APIClientError, ResponseParseError):
                self.logger.debug(
                    f"Failed to get agenda items for protocol {pid}"
                )
                continue

            for item in items:
                ain = item.get("agendaItemNumber")
                if ain is None:
                    self.logger.debug(
                        f"Skipping agenda item with missing number: {item}. "
                    )
                    continue
                try:
                    speeches = self.get_speeches(wp_int, num_int, str(ain))
                    for speech in speeches:
                        yield speech
                except (APIClientError, ResponseParseError):
                    self.logger.debug(
                        f"Failed to get speeches for protocol {pid}, agenda item {ain}"
                    )
                    continue

fetch_all_speeches()

Fetches all speeches by iterating through protocols and their agenda items.

Returns:
  • Iterator[Dict[str, Any]]

    List[Dict[str, Any]]: A list containing all speeches found.

Source code in src/extraction/datasources/bundestag/client.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def fetch_all_speeches(self) -> Iterator[Dict[str, Any]]:
    """
    Fetches all speeches by iterating through protocols and their agenda items.

    Returns:
        List[Dict[str, Any]]: A list containing all speeches found.
    """
    try:
        protocols = self.get_protocols()
    except (APIClientError, ResponseParseError):
        self.logger.debug("Failed to get protocols")
        return []

    for prot in protocols:
        pid = prot.get("id")
        wp = prot.get("legislaturePeriod")
        num = prot.get("number")
        if not pid or wp is None or num is None:
            self.logger.debug(
                f"Skipping protocol with missing data: {prot}. "
            )
            continue  # skip incomplete entries

        # normalize types
        try:
            wp_int = int(wp)
            num_int = int(num)
        except (ValueError, TypeError):
            continue

        # get agenda items
        try:
            items = self.get_agenda_items(pid)
        except (APIClientError, ResponseParseError):
            self.logger.debug(
                f"Failed to get agenda items for protocol {pid}"
            )
            continue

        for item in items:
            ain = item.get("agendaItemNumber")
            if ain is None:
                self.logger.debug(
                    f"Skipping agenda item with missing number: {item}. "
                )
                continue
            try:
                speeches = self.get_speeches(wp_int, num_int, str(ain))
                for speech in speeches:
                    yield speech
            except (APIClientError, ResponseParseError):
                self.logger.debug(
                    f"Failed to get speeches for protocol {pid}, agenda item {ain}"
                )
                continue

get_agenda_items(protocol_id)

Fetches agenda items for a specific protocol ID.

Parameters:
  • protocol_id (str) –

    The ID of the protocol.

Returns:
  • List[Dict[str, Any]]

    List[Dict[str, Any]]: A list of agenda items.

Source code in src/extraction/datasources/bundestag/client.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def get_agenda_items(self, protocol_id: str) -> List[Dict[str, Any]]:
    """
    Fetches agenda items for a specific protocol ID.

    Args:
        protocol_id (str): The ID of the protocol.

    Returns:
        List[Dict[str, Any]]: A list of agenda items.
    """
    result = self.safe_get(f"GetAgendaItemsOfProtocol/{protocol_id}")
    items = result.get("agendaItems")
    if items is None:
        self.logger.debug(f"No agenda items found for {protocol_id}")
        return []
    if not isinstance(items, list):
        raise ResponseParseError(
            f"Expected list of agendaItems for {protocol_id}, got: {items}"
        )
    return items

get_protocols()

Fetches the list of all protocols.

Returns:
  • List[Dict[str, Any]]

    List[Dict[str, Any]]: A list of protocols.

Source code in src/extraction/datasources/bundestag/client.py
54
55
56
57
58
59
60
61
62
63
64
65
66
def get_protocols(self) -> List[Dict[str, Any]]:
    """
    Fetches the list of all protocols.

    Returns:
        List[Dict[str, Any]]: A list of protocols.
    """
    result = self.safe_get("GetProtocols")
    if not isinstance(result, list):
        raise ResponseParseError(
            f"Expected list of protocols, got: {result}"
        )
    return result

get_speaker_data(speaker_id)

Source code in src/extraction/datasources/bundestag/client.py
89
90
91
92
93
94
def get_speaker_data(self, speaker_id: str) -> Dict[str, Any]:
    """ """
    result = self.safe_get(f"GetSpeakerById/{speaker_id}")
    if not isinstance(result, dict):
        raise ResponseParseError(f"Expected speaker data, got: {result}")
    return result

get_speeches(legislature_period, protocol_number, agenda_item_number)

Fetches speeches for a specific agenda item within a protocol.

Parameters:
  • legislature_period (int) –

    The legislature period.

  • protocol_number (int) –

    The protocol number.

  • agenda_item_number (str) –

    The agenda item number.

Returns:
  • List[Dict[str, Any]]

    List[Dict[str, Any]]: A list of speeches.

Source code in src/extraction/datasources/bundestag/client.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@retry_request
def get_speeches(
    self,
    legislature_period: int,
    protocol_number: int,
    agenda_item_number: str,
) -> List[Dict[str, Any]]:
    """
    Fetches speeches for a specific agenda item within a protocol.

    Args:
        legislature_period (int): The legislature period.
        protocol_number (int): The protocol number.
        agenda_item_number (str): The agenda item number.

    Returns:
        List[Dict[str, Any]]: A list of speeches.
    """
    raw = f"{legislature_period},{protocol_number},{agenda_item_number}"
    encoded = quote(raw, safe="")
    result = self.safe_get(f"GetSpeechesOfAgendaItem/{encoded}")
    speeches = result.get("speeches")
    if speeches is None:
        self.logger.debug(f"No speeches found for {raw}")
        return []
    if not isinstance(speeches, list):
        raise ResponseParseError(
            f"Expected list of speeches for {raw}, got: {speeches}"
        )

    # add speaker data
    for speech in speeches:
        speaker_id = speech.get("speakerId")
        if speaker_id is None:
            continue
        try:
            speaker_data = self.get_speaker_data(speaker_id)
            speech["speaker"] = speaker_data
        except (APIClientError, ResponseParseError):
            self.logger.debug(
                f"Failed to get speaker data for {speaker_id}"
            )
            continue

    return speeches

safe_get(path)

Perform a GET request, raise for HTTP errors, parse JSON, check API status.

Parameters:
  • path (str) –

    endpoint path under BASE_URL, e.g. "GetProtocols" or "GetAgendaItemsOfProtocol/"

Returns:
  • Dict[str, Any]

    Dict[str, Any]: The 'result' field of the API response as a dict.

Raises:
  • ResponseParseError

    if HTTP status is not OK or unexpected JSON structure.

Source code in src/extraction/datasources/bundestag/client.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def safe_get(self, path: str) -> Dict[str, Any]:
    """
    Perform a GET request, raise for HTTP errors, parse JSON, check API status.

    Args:
        path: endpoint path under BASE_URL, e.g. "GetProtocols" or
              "GetAgendaItemsOfProtocol/<protocol_id>"

    Returns:
        Dict[str, Any]: The 'result' field of the API response as a dict.

    Raises:
        ResponseParseError: if HTTP status is not OK or unexpected JSON structure.
    """
    url = f"{self.BASE_URL}/{path.lstrip('/')}"
    resp = self.get(url)
    try:
        resp.raise_for_status()
    except Exception as e:
        raise ResponseParseError(f"HTTP error for {url}: {e}")

    data = resp.json()
    if not isinstance(data, dict) or data.get("status") != "200":
        raise ResponseParseError(f"Unexpected response for {url}: {data}")

    result = data.get("result")
    if result is None:
        self.logger.debug(f"No result found for {url}")
        return {}
    return result

BundestagMineClientFactory

Bases: SingletonFactory

Factory for creating and managing Bundestag client instances.

This factory ensures only one Bundestag client is created per configuration, following the singleton pattern provided by the parent SingletonFactory class.

Source code in src/extraction/datasources/bundestag/client.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class BundestagMineClientFactory(SingletonFactory):
    """
    Factory for creating and managing Bundestag client instances.

    This factory ensures only one Bundestag client is created per configuration,
    following the singleton pattern provided by the parent SingletonFactory class.
    """

    _configuration_class: Type = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BundestagMineClient:
        """
        Creates a new BundestagMine client instance using the provided configuration.

        Args:
            configuration: Configuration object containing BundestagMine details

        Returns:
            A configured BundestagMine client instance ready for API interactions.
        """
        return BundestagMineClient()

Configuration

Reader

BundestagMineDatasourceReader

Bases: BaseReader

Reader for extracting speeches from the BundestagMine API.

Implements document extraction from the Bundestag speeches.

Source code in src/extraction/datasources/bundestag/reader.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class BundestagMineDatasourceReader(BaseReader):
    """Reader for extracting speeches from the BundestagMine API.

    Implements document extraction from the Bundestag speeches.
    """

    def __init__(
        self,
        configuration: BundestagMineDatasourceConfiguration,
        client: BundestagMineClient,
        logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
    ):
        """Initialize the BundestagMine reader.

        Args:
            configuration: Settings for BundestagMine access and export limits
            client: Client for BundestagMine API interactions
            logger: Logger instance for recording operation information
        """
        super().__init__()
        self.export_limit = configuration.export_limit
        self.client = client
        self.logger = logger

    async def read_all_async(
        self,
    ) -> AsyncIterator[dict]:
        """Asynchronously fetch all speeches from BundestagMine.

        Yields each speech as a dictionary containing its content and metadata.

        Returns:
            AsyncIterator[dict]: An async iterator of page dictionaries containing
            content and metadata such as text, speaker data, and last update information
        """
        self.logger.info(
            f"Fetching speeches from BundestagMine with limit {self.export_limit}"
        )
        speech_iterator = self.client.fetch_all_speeches()
        yield_counter = 0

        for speech in speech_iterator:
            speech_limit = (
                self.export_limit - yield_counter
                if self.export_limit is not None
                else None
            )
            if speech_limit is not None and speech_limit <= 0:
                break

            yield_counter += 1
            yield speech

__init__(configuration, client, logger=LoggerConfiguration.get_logger(__name__))

Initialize the BundestagMine reader.

Parameters:
  • configuration (BundestagMineDatasourceConfiguration) –

    Settings for BundestagMine access and export limits

  • client (BundestagMineClient) –

    Client for BundestagMine API interactions

  • logger (Logger, default: get_logger(__name__) ) –

    Logger instance for recording operation information

Source code in src/extraction/datasources/bundestag/reader.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    configuration: BundestagMineDatasourceConfiguration,
    client: BundestagMineClient,
    logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
):
    """Initialize the BundestagMine reader.

    Args:
        configuration: Settings for BundestagMine access and export limits
        client: Client for BundestagMine API interactions
        logger: Logger instance for recording operation information
    """
    super().__init__()
    self.export_limit = configuration.export_limit
    self.client = client
    self.logger = logger

read_all_async() async

Asynchronously fetch all speeches from BundestagMine.

Yields each speech as a dictionary containing its content and metadata.

Returns:
  • AsyncIterator[dict]

    AsyncIterator[dict]: An async iterator of page dictionaries containing

  • AsyncIterator[dict]

    content and metadata such as text, speaker data, and last update information

Source code in src/extraction/datasources/bundestag/reader.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async def read_all_async(
    self,
) -> AsyncIterator[dict]:
    """Asynchronously fetch all speeches from BundestagMine.

    Yields each speech as a dictionary containing its content and metadata.

    Returns:
        AsyncIterator[dict]: An async iterator of page dictionaries containing
        content and metadata such as text, speaker data, and last update information
    """
    self.logger.info(
        f"Fetching speeches from BundestagMine with limit {self.export_limit}"
    )
    speech_iterator = self.client.fetch_all_speeches()
    yield_counter = 0

    for speech in speech_iterator:
        speech_limit = (
            self.export_limit - yield_counter
            if self.export_limit is not None
            else None
        )
        if speech_limit is not None and speech_limit <= 0:
            break

        yield_counter += 1
        yield speech

BundestagMineDatasourceReaderFactory

Bases: Factory

Factory for creating BundestagMine reader instances.

Creates and configures BundestagMineDatasourceReader objects with appropriate clients based on the provided configuration.

Source code in src/extraction/datasources/bundestag/reader.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class BundestagMineDatasourceReaderFactory(Factory):
    """Factory for creating BundestagMine reader instances.

    Creates and configures BundestagMineDatasourceReader objects with appropriate
    clients based on the provided configuration.
    """

    _configuration_class = BundestagMineDatasourceConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: BundestagMineDatasourceConfiguration
    ) -> BundestagMineDatasourceReader:
        """Creates a configured BundestagMine reader instance.

        Initializes the BundestagMine client and reader with the given configuration
        settings for credentials, URL, and export limits.

        Args:
            configuration: BundestagMine connection and access settings

        Returns:
            BundestagMineDatasourceReader: Fully configured reader instance
        """
        client = BundestagMineClientFactory.create(configuration)
        return BundestagMineDatasourceReader(
            configuration=configuration,
            client=client,
        )