diff --git a/docs/tested.rst b/docs/tested.rst index 6ca09ce..d34f27d 100644 --- a/docs/tested.rst +++ b/docs/tested.rst @@ -11,11 +11,13 @@ Postbank Yes BBBank eG Yes Yes Sparkasse Heidelberg Yes comdirect Yes Yes +Consorsbank Yes Yes ======================================== ============ ======== ======== ====== Tested security functions ------------------------- +* ``900`` "photoTAN" / "Secure Plus" (QR code) * ``902`` "photoTAN" * ``921`` "pushTAN" * ``930`` "mobile TAN" diff --git a/docs/transfers.rst b/docs/transfers.rst index c2e20c2..8f9fec2 100644 --- a/docs/transfers.rst +++ b/docs/transfers.rst @@ -67,13 +67,22 @@ Full example if isinstance(res, NeedTANResponse): print("A TAN is required", res.challenge) + # photoTAN / QR code: save and display the image + if getattr(res, 'challenge_matrix', None): + mime_type, image_data = res.challenge_matrix + with open('tan_challenge.png', 'wb') as f: + f.write(image_data) + print(f"QR code saved to tan_challenge.png ({len(image_data)} bytes)") + # Optionally open the image automatically: + # import subprocess; subprocess.Popen(['open', 'tan_challenge.png']) + if getattr(res, 'challenge_hhduc', None): try: terminal_flicker_unix(res.challenge_hhduc) except KeyboardInterrupt: pass - if result.decoupled: + if res.decoupled: tan = input('Please press enter after confirming the transaction in your app:') else: tan = input('Please enter TAN:') diff --git a/fints/client.py b/fints/client.py index 7e0898a..d6685b1 100644 --- a/fints/client.py +++ b/fints/client.py @@ -28,7 +28,7 @@ PinTanTwoStepAuthenticationMechanism, ) from .segments.accounts import HISPA1, HKSPA1 -from .segments.auth import HIPINS1, HKTAB4, HKTAB5, HKTAN2, HKTAN3, HKTAN5, HKTAN6, HKTAN7, HIVPPS1, HIVPP1, PSRD1, HKVPA1 +from .segments.auth import HIPINS1, HKTAB4, HKTAB5, HKTAN2, HKTAN3, HKTAN5, HKTAN6, HKTAN7, HIVPPS1, HIVPP1, HKVPP1, PSRD1, HKVPA1 from .segments.bank import HIBPA3, HIUPA4, HKKOM4 from .segments.debit import ( HKDBS1, HKDBS2, HKDMB1, HKDMC1, HKDME1, HKDME2, @@ -1253,16 +1253,24 @@ def _parse_tan_challenge(self): class FinTS3PinTanClient(FinTS3Client): - def __init__(self, bank_identifier, user_id, pin, server, customer_id=None, tan_medium=None, *args, **kwargs): + def __init__(self, bank_identifier, user_id, pin, server, customer_id=None, tan_medium=None, + force_twostep_tan=None, *args, **kwargs): self.pin = Password(pin) if pin is not None else pin self._pending_tan = None self.connection = FinTSHTTPSConnection(server) self.allowed_security_functions = [] self.selected_security_function = None self.selected_tan_medium = tan_medium + self.force_twostep_tan = set(force_twostep_tan) if force_twostep_tan else self._default_force_twostep_tan(bank_identifier) self._bootstrap_mode = True super().__init__(bank_identifier=bank_identifier, user_id=user_id, customer_id=customer_id, *args, **kwargs) + @staticmethod + def _default_force_twostep_tan(bank_identifier): + if str(bank_identifier) == '76030080': + return {'HKCCS', 'HKKAZ', 'HKSAL'} + return set() + def _new_dialog(self, lazy_init=False): if self.pin is None: enc = None @@ -1394,14 +1402,16 @@ def _find_vop_format_for_segment(self, seg): def _need_twostep_tan_for_segment(self, seg): if not self.selected_security_function or self.selected_security_function == '999': return False - else: - hipins = self.bpd.find_segment_first(HIPINS1) - if not hipins: - return False - else: - for requirement in hipins.parameter.transaction_tans_required: - if seg.header.type == requirement.transaction: - return requirement.tan_required + + if seg.header.type in self.force_twostep_tan: + return True + + hipins = self.bpd.find_segment_first(HIPINS1) + if not hipins: + return False + for requirement in hipins.parameter.transaction_tans_required: + if seg.header.type == requirement.transaction: + return requirement.tan_required return False @@ -1414,12 +1424,13 @@ def _send_with_possible_retry(self, dialog, command_seg, resume_func): for resp in response.responses(tan_seg): if resp.code in ('0030', '3955'): + decoupled = any(r.code == '3955' for r in response.responses(tan_seg)) return NeedTANResponse( command_seg, response.find_segment_first('HITAN'), resume_func, self.is_challenge_structured(), - resp.code == '3955', + decoupled, ) if resp.code.startswith('9'): raise Exception("Error response: {!r}".format(response)) @@ -1443,11 +1454,16 @@ def _send_pay_with_possible_retry(self, dialog, command_seg, resume_func): - 'RVNM' - no match, no extra info seen - 'RVNA' - check not available, reason in single_vop_result.na_reason - 'PDNG' - pending, seems related to something not implemented right now. + + VoP polling flow (FinTS spec E.8.3.1): + Some banks return HIVPP with no vop_id but a polling_id and code 3040:aufsetzpunkt. + The client must poll by re-sending HKVPP with polling_id + aufsetzpunkt (without + HKCCS/HKTAN) until the bank returns HIVPP with a vop_id and the actual VoP result. + After that, the client sends HKVPA + HKCCS + HKTAN to authorize. """ vop_seg = [] vop_standard = self._find_vop_format_for_segment(command_seg) if vop_standard: - from .segments.auth import HKVPP1 vop_seg = [HKVPP1(supported_reports=PSRD1(psrd=[vop_standard]))] with dialog: @@ -1460,9 +1476,52 @@ def _send_pay_with_possible_retry(self, dialog, command_seg, resume_func): if vop_standard: hivpp = response.find_segment_first(HIVPP1, throw=True) + # Check if VOP polling is required: HIVPP has no vop_id but has polling_id + if not hivpp.vop_id and hivpp.polling_id: + # Extract aufsetzpunkt from HIRMS 3040 response + aufsetzpunkt = None + for hirms_seg in response.find_segments(HIRMS2): + for resp in hirms_seg.responses: + if resp.code == '3040' and resp.parameters: + aufsetzpunkt = resp.parameters[0] + + wait_seconds = int(hivpp.wait_for_seconds) if hivpp.wait_for_seconds else 2 + logger.info("VoP polling required (polling_id=%r, aufsetzpunkt=%r, wait=%ds)", + hivpp.polling_id, aufsetzpunkt, wait_seconds) + + import time + time.sleep(wait_seconds) + + # Poll: send HKVPP with polling_id + aufsetzpunkt (no HKCCS, no HKTAN) + poll_seg = HKVPP1( + supported_reports=PSRD1(psrd=[vop_standard]), + polling_id=hivpp.polling_id, + aufsetzpunkt=aufsetzpunkt, + ) + poll_response = dialog.send(poll_seg) + hivpp = poll_response.find_segment_first(HIVPP1, throw=True) + logger.info("VoP poll result: vop_id=%r", hivpp.vop_id) + vop_result = hivpp.vop_single_result - # Not Applicable, No Match, Close Match, or exact match but still requires confirmation - if vop_result.result in ('RVNA', 'RVNM', 'RVMC') or (vop_result.result == 'RCVC' and '3945' in [res.code for res in response.responses(tan_seg)]): + # Not Applicable, No Match, Close Match, or exact match but still requires confirmation + tan_codes = [res.code for res in response.responses(tan_seg)] + command_codes = [res.code for res in response.responses(command_seg)] + all_codes = [] + for seg in response.find_segments((HIRMG2, HIRMS2)): + all_codes.extend(r.code for r in seg.responses) + + # If we have a vop_id (from initial or polling), return NeedVOPResponse + # so the caller can inspect the result and then call approve_vop_response + if hivpp.vop_id: + return NeedVOPResponse( + vop_result=hivpp, + command_seg=command_seg, + resume_method=resume_func, + ) + + if vop_result and (vop_result.result in ('RVNA', 'RVNM', 'RVMC') or ( + vop_result.result == 'RCVC' and '3945' in all_codes + )): return NeedVOPResponse( vop_result=hivpp, command_seg=command_seg, @@ -1473,16 +1532,37 @@ def _send_pay_with_possible_retry(self, dialog, command_seg, resume_func): for resp in response.responses(tan_seg): if resp.code in ('0030', '3955'): + # Consorsbank returns 0030 together with 3955 + # ("Sicherheitsfreigabe erfolgt über anderen Kanal") + # for decoupled app approval. Treat the operation as + # decoupled whenever 3955 is present, regardless of the + # order in which the codes appear. + decoupled = any(r.code == '3955' for r in response.responses(tan_seg)) return NeedTANResponse( command_seg, response.find_segment_first('HITAN'), resume_func, self.is_challenge_structured(), - resp.code == '3955', + decoupled, hivpp, ) if resp.code.startswith('9'): raise Exception("Error response: {!r}".format(response)) + + # Some banks (e.g. Consorsbank) attach the 0030 TAN-required + # response to the command segment (HKCCS) rather than the + # HKTAN segment. Check command_seg responses as fallback. + for resp in response.responses(command_seg): + if resp.code in ('0030', '3955'): + decoupled = any(r.code == '3955' for r in response.responses(command_seg)) + return NeedTANResponse( + command_seg, + response.find_segment_first('HITAN'), + resume_func, + self.is_challenge_structured(), + decoupled, + hivpp, + ) else: response = dialog.send(command_seg) @@ -1518,6 +1598,30 @@ def approve_vop_response(self, challenge: NeedVOPResponse): challenge.vop_result, ) + for resp in response.responses(challenge.command_seg): + if resp.code in ('0030', '3955'): + return NeedTANResponse( + challenge.command_seg, + response.find_segment_first('HITAN'), + challenge.resume_method, + self.is_challenge_structured(), + resp.code == '3955', + challenge.vop_result, + ) + + for seg in response.find_segments((HIRMG2, HIRMS2)): + for resp in seg.responses: + if resp.code not in ('0030', '3955'): + continue + return NeedTANResponse( + challenge.command_seg, + response.find_segment_first('HITAN'), + challenge.resume_method, + self.is_challenge_structured(), + resp.code == '3955', + challenge.vop_result, + ) + resume_func = getattr(self, challenge.resume_method) return resume_func(challenge.command_seg, response) @@ -1554,7 +1658,7 @@ def send_tan(self, challenge: NeedTANResponse, tan: str): "No TAN status received." ) for resp in response.responses(tan_seg): - if resp.code == '3956': + if resp.code == '3956' or self._is_decoupled_signature_pending(resp): return NeedTANResponse( challenge.command_seg, challenge.tan_request, @@ -1566,6 +1670,14 @@ def send_tan(self, challenge: NeedTANResponse, tan: str): resume_func = getattr(self, challenge.resume_method) return resume_func(challenge.command_seg, response) + @staticmethod + def _is_decoupled_signature_pending(response): + return ( + response.code == '9010' + and 'Unterschriften' in str(getattr(response, 'text', '')) + and 'nicht ausreichend' in str(getattr(response, 'text', '')) + ) + def _process_response(self, dialog, segment, response): if response.code == '3920' and not self.bank_identifier == ING_BANK_IDENTIFIER: self.allowed_security_functions = list(response.parameters) @@ -1587,7 +1699,7 @@ def _process_response(self, dialog, segment, response): # Fall back to onestep self.set_tan_mechanism('999') - if response.code == '9010': + if response.code == '9010' and not dialog.open: raise FinTSClientError("Error during dialog initialization, could not fetch BPD. Please check that you " "passed the correct bank identifier to the HBCI URL of the correct bank.") diff --git a/fints/dialog.py b/fints/dialog.py index e1f66aa..7466b12 100644 --- a/fints/dialog.py +++ b/fints/dialog.py @@ -85,8 +85,14 @@ def init(self, *extra_segments): retval = self.send(*segments, internal_send=True) if tan_seg: - for resp in retval.responses(tan_seg): - if resp.code in ('0030', '3955'): + # Some banks (e.g. Consorsbank) attach the login-SCA + # 0030/3955 response to the HKIDN segment instead of the + # HKTAN segment, so check both references. + for ref in (tan_seg, segments[0]): + if self.client.init_tan_response is not None: + break + ref_responses = list(retval.responses(ref)) + if any(resp.code in ('0030', '3955') for resp in ref_responses): self.client.init_tan_response = NeedTANResponse( None, retval.find_segment_first('HITAN'), @@ -94,9 +100,12 @@ def init(self, *extra_segments): self.client.is_challenge_structured(), False, ) - if resp.code == '3955': + # 3955 ("Sicherheitsfreigabe erfolgt über anderen + # Kanal") flags decoupled app approval; it may appear + # alongside 0030, so check all responses, not just + # the first matching code. + if any(resp.code == '3955' for resp in ref_responses): self.client.init_tan_response.decoupled = True - break self.need_init = False return retval diff --git a/fints/formals.py b/fints/formals.py index 2d9d907..3afe8ca 100644 --- a/fints/formals.py +++ b/fints/formals.py @@ -543,6 +543,12 @@ def from_sepa_account(cls, acc): return cls( iban=acc.iban, bic=acc.bic, + account_number=acc.accountnumber, + subaccount_number=acc.subaccount, + bank_identifier=BankIdentifier( + country_identifier=BankIdentifier.COUNTRY_ALPHA_TO_NUMERIC[acc.bic[4:6]], + bank_code=acc.blz + ) if acc.blz else None, ) diff --git a/fints/security.py b/fints/security.py index e9f14e1..f65b5a8 100644 --- a/fints/security.py +++ b/fints/security.py @@ -104,8 +104,15 @@ def sign_prepare(self, message: FinTSMessage): _now = datetime.datetime.now() rand = random.SystemRandom() + # Per ZKA FinTS spec, two-step TAN methods (security_function != '999') + # require security_method_version=2 in the SecurityProfile. + if self.security_function and self.security_function != '999': + security_method_version = 2 + else: + security_method_version = 1 + self.pending_signature = HNSHK4( - security_profile=SecurityProfile(SecurityMethod.PIN, 1), + security_profile=SecurityProfile(SecurityMethod.PIN, security_method_version), security_function=self.security_function, security_reference=rand.randint(1000000, 9999999), security_application_area=SecurityApplicationArea.SHM, diff --git a/sample_consorsbank.py b/sample_consorsbank.py new file mode 100644 index 0000000..2d57ae9 --- /dev/null +++ b/sample_consorsbank.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python3 +""" +Sample: Consorsbank (BLZ 76030080) with python-fints. + +Demonstrates fetching accounts/transactions and making a SEPA transfer with +either of Consorsbank's two current TAN methods. + +TAN methods +----------- +Consorsbank advertises two two-step TAN mechanisms: + +* **901 "Consorsbank/myPrivateBank App"** (``zka_id = "Decoupled"``) — the new + Consorsbank App. Login asks for a *typed* 9-digit TAN generated in the app; + a SEPA transfer is *approved in the app* (decoupled), no TAN is typed. +* **900 "SecurePlus TAN Generator"** (``zka_id = "photoTAN"``) — login needs no + TAN (the bank answers the dialog-init with ``3076``); a SEPA transfer returns + an order-bound **photoTAN QR image** (``response.challenge_matrix``) that is + scanned, after which the resulting TAN is typed. + +Both were exercised end-to-end against Consorsbank. A real SEPA transfer with +mechanism ``901`` was accepted and booked. + +.. note:: + The old **SecurePlus *App*** (the smartphone app, distinct from the + SecurePlus *TAN-generator device*) was shut down for Consorsbank online + banking on **2026-04-25**; since then it returns "TAN-Verfahren ungültig" + and any TAN it produces — including ones scanned from the ``900`` photoTAN + QR — is rejected with ``9941 TAN ungültig``. Use the new **Consorsbank App** + (``901``) or the **physical SecurePlus TAN-generator device** (``900``). + The ``900`` code path here is correct; only the decommissioned app's TAN is + refused by the bank. (Source: kritische-anleger.de SecurePlus-App shutdown + report, and the official Consorsbank HBCI FAQ.) + +Protocol quirks handled by python-fints (see PR #209 and the login-SCA fix) +-------------------------------------------------------------------------- +1. ``security_method_version=2`` for two-step TAN. +2. Full account details in ``KTI1.from_sepa_account``. +3. ``force_twostep_tan`` for segments the bank requires a TAN on despite + reporting otherwise in HIPINS (otherwise: ``9075``). +4. The TAN-required response ``0030`` is attached to the **command segment** + (``HKCCS``) instead of ``HKTAN``. +5. The **login** strong-customer-authentication response ``0030`` is attached + to the **HKIDN** segment of the dialog-init, not ``HKTAN``. Without + detecting it, the next command aborts the dialog with ``9800/9120``. +6. For decoupled app approval, Consorsbank returns ``0030`` **together with** + ``3955`` ("Sicherheitsfreigabe erfolgt über anderen Kanal"). python-fints + now flags the challenge as ``decoupled`` whenever ``3955`` is present. + +What the user sees with the Consorsbank App (mechanism 901) +----------------------------------------------------------- +* **Login**: the bank asks for a *typed* TAN. Open the Consorsbank App, + generate the (9-digit) TAN and type it in. ``response.decoupled`` is False. +* **SEPA transfer**: the bank pushes the order to the app for approval. + ``response.decoupled`` is True; the user approves in the app and the client + polls until the bank confirms — no TAN is typed. + +Usage: + pip install python-fints python-dotenv + python sample_consorsbank.py + +Environment variables (or .env file): + FINTS_BLZ=76030080 + FINTS_USER= + FINTS_PIN= + FINTS_SERVER=https://brokerage-hbci.consorsbank.de/hbci + FINTS_PRODUCT_ID= + FINTS_TAN_MECHANISM=901 # 901 = Consorsbank App, 900 = SecurePlus generator + MY_IBAN= + # To actually send money, set all of these: + TRANSFER_TO_IBAN= + TRANSFER_TO_NAME= + TRANSFER_AMOUNT=1.00 + TRANSFER_REASON=Test transfer +""" + +import os +import time +import logging +from datetime import date, timedelta +from decimal import Decimal + +from fints.client import FinTS3PinTanClient, NeedTANResponse, NeedVOPResponse + +logging.basicConfig(level=logging.WARNING) + +try: + from dotenv import load_dotenv + load_dotenv() +except ImportError: + pass + + +def handle_tan(response, client): + """Resolve TAN / VoP / decoupled challenges. + + * Decoupled (e.g. Consorsbank App): the user approves the order inside the + banking app; we poll with ``send_tan`` until the bank confirms. + * photoTAN / QR: an image is shown to scan, then the TAN is typed. + * Plain: the user types the TAN shown by the app/generator. + """ + while isinstance(response, (NeedTANResponse, NeedVOPResponse)): + if isinstance(response, NeedVOPResponse): + # Verification of Payee result; approve and continue. + response = client.approve_vop_response(response) + continue + + print(f"\nTAN required: {response.challenge}") + + # Decoupled app approval (Consorsbank App): no TAN is typed. + if response.decoupled: + input("Approve the request in your Consorsbank App, then press ENTER... ") + # Poll the bank until the decoupled approval is registered. + response = client.send_tan(response, "") + while isinstance(response, NeedTANResponse) and response.decoupled: + time.sleep(4) + response = client.send_tan(response, "") + continue + + # photoTAN / QR code image (e.g. SecurePlus generator). + if response.challenge_matrix: + mime_type, image_data = response.challenge_matrix + ext = ".png" if "png" in mime_type else ".jpg" + img_path = f"tan_challenge{ext}" + with open(img_path, "wb") as f: + f.write(image_data) + print(f" Challenge image saved to {img_path} ({len(image_data)} bytes)") + tan = input("Scan the image and enter the TAN: ") + else: + # Plain typed TAN (e.g. Consorsbank App login TAN, 9 digits). + tan = input("Enter the TAN from your app/generator: ") + + response = client.send_tan(response, tan) + + return response + + +def main(): + blz = os.environ.get("FINTS_BLZ", "76030080") + user = os.environ["FINTS_USER"] + pin = os.environ["FINTS_PIN"] + server = os.environ.get("FINTS_SERVER", "https://brokerage-hbci.consorsbank.de/hbci") + product_id = os.environ.get("FINTS_PRODUCT_ID") + mechanism = os.environ.get("FINTS_TAN_MECHANISM", "901") + my_iban = os.environ.get("MY_IBAN") + + client = FinTS3PinTanClient( + bank_identifier=blz, + user_id=user, + pin=pin, + server=server, + product_id=product_id, + # Consorsbank reports HKKAZ:N / HKSAL:N in HIPINS but actually requires + # a TAN for them; HKCCS always requires a TAN. + force_twostep_tan={"HKKAZ", "HKSAL"}, + ) + + # 901 = Consorsbank App (current), 900 = physical SecurePlus TAN generator. + if not client.get_current_tan_mechanism(): + client.fetch_tan_mechanisms() + client.set_tan_mechanism(mechanism) + + with client: + # Login strong-customer-authentication (typed TAN with the app). + if client.init_tan_response: + handle_tan(client.init_tan_response, client) + + # --- Fetch accounts --- + accounts = client.get_sepa_accounts() + if isinstance(accounts, NeedTANResponse): + accounts = handle_tan(accounts, client) + + print("Accounts:") + for a in accounts: + print(f" {a.iban} (BIC: {a.bic})") + + if my_iban: + account = next((a for a in accounts if a.iban == my_iban), None) + if not account: + print(f"Account {my_iban} not found") + return + else: + account = accounts[0] + print(f"\nUsing account: {account.iban}") + + # --- Fetch transactions --- + print("\nFetching transactions (last 30 days)...") + start_date = date.today() - timedelta(days=30) + res = client.get_transactions(account, start_date=start_date) + if isinstance(res, (NeedTANResponse, NeedVOPResponse)): + res = handle_tan(res, client) + if res: + print(f"Found {len(res)} transactions; showing last 5:") + for t in res[-5:]: + d = t.data + amt = d.get("amount") + amount_str = f"{amt.amount:>10.2f} {amt.currency}" if amt else "" + print(f" {d.get('date')} {amount_str} {d.get('applicant_name', '')}") + else: + print("No transactions found.") + + # --- SEPA transfer (approved in the Consorsbank App) --- + to_iban = os.environ.get("TRANSFER_TO_IBAN") + if to_iban: + print(f"\nSubmitting SEPA transfer to {to_iban} ...") + res = client.simple_sepa_transfer( + account=account, + iban=to_iban, + bic=os.environ.get("TRANSFER_TO_BIC", ""), + recipient_name=os.environ["TRANSFER_TO_NAME"], + amount=Decimal(os.environ.get("TRANSFER_AMOUNT", "1.00")), + account_name=os.environ.get("TRANSFER_FROM_NAME", user), + reason=os.environ.get("TRANSFER_REASON", "Test transfer"), + ) + # The bank pushes the order to the Consorsbank App for approval. + res = handle_tan(res, client) + print(f"Transfer result: {getattr(res, 'status', None)} {getattr(res, 'responses', None)}") + else: + print("\n(Set TRANSFER_TO_IBAN/TRANSFER_TO_NAME to perform a transfer.)") + + print("\nDone!") + + +if __name__ == "__main__": + main()