saxoforn

SaxoForN data extraction tool
git clone git://arztpraxis-lohmann.de/saxoforn
Log | Files | Refs | LICENSE

commit 5ac29db421ca6cdb04f5ef1cff5fb06bd8826fa8
parent 4e9af0a6535da6389c5eb1f843623655d9591ebd
Author: Christoph Lohmann <20h@r-36.net>
Date:   Wed, 25 Oct 2023 13:22:17 +0200

Add Q32023 SaxoForN-Umfrage.

Diffstat:
AsaxofornQ32023.py | 419+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 419 insertions(+), 0 deletions(-)

diff --git a/saxofornQ32023.py b/saxofornQ32023.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python +# coding=utf-8 +# +# © 2019-2023 Christoph Lohmann <20h@r-36.net> +# +# This file is published under the terms of the GPLv3. +# + +import os +import sys +import getopt +import codecs +from datetime import datetime +import operator + +topicno = 1 + +# Return num of patients who have one occurence of some ebmcodes. +def ebmcodeoccurence(container, ebmcodes): + occurence = 0 + for sentence in container["sentences"]: + found = 0 + for field in sentence["fields"]: + if field[0] == 5001: + if field[1] in ebmcodes: + if found == 0: + occurence += 1 + found = 1 + return occurence + +# Return sum of all occurences of some ebmcodes. +def ebmcodesumoccurence(container, ebmcodes): + occurence = 0 + for sentence in container["sentences"]: + for field in sentence["fields"]: + if field[0] == 5001: + if field[1] in ebmcodes: + occurence += 1 + return occurence + +def expandcodes(codes): + newcodes = [] + for code in codes: + if code.endswith(".-"): + for c in "012345678": + newcodes.append(code.replace("-", c)) + else: + newcodes.append(code) + return newcodes + +def printcode(codesdb, code): + if code in codesdb: + print("%s = %d" % (code, codesdb[code])) + else: + print("%s = 0" % (code)) + +def printcodes(codesdb, codes): + newcodes = expandcodes(codes) + for code in newcodes: + printcode(codesdb, code) + +def printcodessum(codesdb, codes): + codesum = 0 + newcodes = expandcodes(codes) + for code in newcodes: + if code in codesdb: + codesum += codesdb[code] + print("%s = %d" % (newcodes, codesum)) + +def sentence2dict(sentence): + rdict = {} + + if not "fields" in sentence: + return rdict + + for field in sentence["fields"]: + fstr = "%s" % (field[0]) + if not fstr in rdict: + rdict[fstr] = [] + rdict[fstr].append(field[1]) + + return rdict + +def parse_xdt(line): + length = int(line[:3]) + fieldid = int(line[3:7]) + fcontent = line[7:-2] + + if length != len(line): + print("Invalid line: %s\n" % (line[:-2])) + + return (length, fieldid, fcontent) + +def usage(app): + app = os.path.basename(app) + print("usage: %s [-h] adt.con" % (app), file=sys.stderr) + sys.exit(1) + +def main(args): + try: + opts, largs = getopt.getopt(args[1:], "h") + except getopt.GetoptError as err: + print(str(err)) + usage(args[0]) + + for o, a in opts: + if o == "-h": + usage(args[0]) + else: + assert False, "unhandled option" + + if len(largs) < 1: + usage(args[0]) + + # Aerzte + besa = {} + # Betriebsstaette + rvsa = {} + # PVS-Hersteller + adt0 = {} + + begin = datetime.now() + lines = 0 + containers = [] + container = {} + sentence = {} + with codecs.open(largs[0], "r", "iso8859-15") as fd: + for line in fd: + (length, fieldid, fcontent) = parse_xdt(line) + lines += 1 + + #print((length, fieldid, fcontent)) + if fieldid == 8000: + if fcontent == "con0": + container["name"] = fcontent + container["sentences"] = [] + container["fields"] = [] + elif fcontent == "con9": + if sentence != {}: + container["sentences"].append(sentence) + containers.append(container) + container = {} + sentence = {} + elif fcontent == "adt9": + if sentence != {}: + container["sentences"].append(sentence) + continue + elif fcontent == "kad9": + if sentence != {}: + container["sentences"].append(sentence) + continue + elif fcontent == "sad9": + if sentence != {}: + container["sentences"].append(sentence) + continue + else: + sentence = {} + sentence["name"] = fcontent + sentence["fields"] = [] + container["sentences"].append(sentence) + + # Filter out special data. + if sentence["name"] == "besa": + besa = sentence + elif sentence["name"] == "rvsa": + rvsa = sentence + elif sentence["name"] == "adt0": + adt0 = sentence + continue + else: + if "fields" in sentence: + sentence["fields"].append([fieldid, + fcontent]) + else: + container["fields"].append([fieldid, + fcontent]) + + besa = sentence2dict(besa) + rvsa = sentence2dict(rvsa) + adt0 = sentence2dict(adt0) + + ambscheine = 0 + notfallscheine = 0 + ueberweisungen = 0 + belegarztscheine = 0 + for sentence in containers[0]["sentences"]: + if sentence["name"] == "0101": + ambscheine += 1 + elif sentence["name"] == "0104": + notfallscheine += 1 + elif sentence["name"] == "0102": + ueberweisungen += 1 + elif sentence["name"] == "0103": + belegarztscheine +=1 + + gesamtscheine = ambscheine + notfallscheine + ueberweisungen + \ + belegarztscheine + + # Anonymisation dictionaries. + # All ICD codes, multiple codes per patient. + icdcodes = {} + # ICD codes, one per patient. + icdcodespp = {} + # All EBM codes, multiple codes per patient. + ebmcodes = {} + # EBM codes, one per patient. + ebmcodespp = {} + + for sentence in containers[0]["sentences"]: + icdcodefound = 0 + ebmcodefound = 0 + for field in sentence["fields"]: + if field[0] == 5001: + if field[1] in ebmcodes: + ebmcodes[field[1]] += 1 + else: + ebmcodes[field[1]] = 1 + + if ebmcodefound == 0: + if field[1] in ebmcodespp: + ebmcodespp[field[1]] += 1 + else: + ebmcodespp[field[1]] = 1 + else: + ebmcodefound = 1 + elif field[0] == 6001: + if field[1] in icdcodes: + icdcodes[field[1]] += 1 + else: + icdcodes[field[1]] = 1 + + if icdcodefound == 0: + if field[1] in icdcodespp: + icdcodespp[field[1]] += 1 + else: + icdcodespp[field[1]] = 1 + else: + icdcodefound = 1 + + def sortdict(d): + return sorted(d.items(), key=lambda kv: kv[1])[::-1] + + def printtopic(topic): + global topicno + print("%d.) %s" % (topicno, topic)) + topicno += 1 + + oicdcodes = sortdict(icdcodes) + oebmcodes = sortdict(ebmcodes) + + print("SaxoForN-Datenerhebung:") + print("") + + printtopic("PVS") + pvshersteller = None + if "111" in adt0: + if "dv@data-vital.de" in adt0["111"][0]: + pvshersteller = "DATA VITAL" + print("PVS-Hersteller = %s" % (pvshersteller)) + print("") + + printtopic("Gesamtscheine") + # KBV_ITA_VGEX_Datensatzbeschreibung_KVDT.pdf#S27 + print("amb. Scheine ..... %d" % (ambscheine)) + print("Notfallscheine ... %d" % (notfallscheine)) + print("Ueberweisungen ... %d" % (ueberweisungen)) + print("Belegarztscheine . %d" % (belegarztscheine)) + print("=========================") + print("Gesamtscheine .... %d" % (gesamtscheine)) + print("") + + printtopic("Grundpauschalen") + for grundpauschale in ["03001", "04001", "03002", "04002",\ + "03003", "04003", "03004", "04004", "03005",\ + "04005"]: + printcode(ebmcodespp, grundpauschale) + print("") + + printtopic("Privatpatienten") + # Consart "P" + # Ziffer "1" + print("") + + #printtopic("AU-Anfragen") + # Krankschreibung: "AU:" + + printtopic("Anfragen Krankenkassen Muster 52") + printcode(ebmcodespp, "01622") + print("") + + #printtopic("Einweisungen") + # TODO: Parse BDT. + # In Bemerkungsfeld: + # Einweisungen: "KH:" + # Ueberweisungen: "UE:" + # Krankschreibung: "AU:" + # Transportschein: "TA:" + # Laborbemerkung: "UL:" + #print("") + + printtopic("COVID-19 Nachweise, PCR-Teste") + printcode(ebmcodespp, "88315") + + sachsencovid19 = ["99136", "88240", "02402", "32006"] + print("Sachsen: %s = %d" % (sachsencovid19,\ + ebmcodeoccurence(containers[0], sachsencovid19))) + hessencovid19 = ["88240", "02402"] + print("Hessen: %s = %d" % (hessencovid19,\ + ebmcodeoccurence(containers[0], hessencovid19))) + print("") + + printtopic("COVID-19 ICDs") + for covid19icd in ["U07.1", "U07.2", "U09.9", "U08.9", "U10.9", "U12.9"]: + printcode(icdcodes, covid19icd) + print("") + + printtopic("Videosprechstunde") + videosprechstunde = ["01450", "40129", "40128", "01444",\ + "01442"] + print("EBM-Codes: %s = %d" % (videosprechstunde,\ + ebmcodeoccurence(containers[0],\ + videosprechstunde))) + print("Bitte Videosprechstundeanbieter heraussuchen.") + print("") + + printtopic("Hausbesuche") + arzthausbesuche = ["01410", "01411", "01412", "01413", "01415"] + naepahausbesuche = ["03062", "03063", "38100"] + impfhausbesuche = ["88323", "88324"] + + print("Arzthaubesuche: %s = %d" % (arzthausbesuche, + ebmcodesumoccurence(containers[0], arzthausbesuche))) + print("Naepahaubesuche: %s = %d" % (naepahausbesuche, + ebmcodesumoccurence(containers[0], naepahausbesuche))) + print("Impfhausbesuche: %s = %d" % (impfhausbesuche, + ebmcodesumoccurence(containers[0], impfhausbesuche))) + print("") + + printtopic("App auf Rezept") + print("") + + printtopic("Schilddrüsenerkrankungen") + printcodessum(icdcodes, ["E01.0", "E01.1", "E01.2", "E04.-"]) + printcodessum(icdcodes, ["E02"]) + printcodessum(icdcodes, ["E03.-", "E89.0"]) + printcodessum(icdcodes, ["E05.-"]) + printcodessum(icdcodes, ["E06.-"]) + printcodessum(icdcodes, ["E07.9"]) + print("") + + printtopic("ePA-Nutzung") + printcode(icdcodes, "88270") + print("TI-Anbindung? ja/nein") + print("Update auf ePA-Konnektor? ja/nein") + print("PVS-Modul ePA? ja/nein") + print("eHBA 2.0 bestellt? ja/nein") + print("Keine Anschaffung bisher? ja/nein") + print("") + + print("Influenza-Impfungen") + influenza = ["89111", "89111S", "89112", "89112Y"] + printcodes(ebmcodes, influenza) + print("") + + print("Pneumokokken-Impfung") + pneumokokken = ["89119", "89119R",\ + "89120", "89120R",\ + "89120V", "89120X"] + printcodes(ebmcodes, pneumokokken) + print("") + + printtopic("COVID-19-Impfungen") + biontech = ["88331A", "88331B", "88331R",\ + "88331G", "88331H", "88331K",\ + "88331V", "88331W", "88331X"] + printcodes(ebmcodes, biontech) + biontechneu = ["88337A", "88337B",\ + "88337V", "88337W",\ + "88337G", "88337H",\ + "88337K", "88337X"] + printcodes(ebmcodes, biontechneu) + moderna = ["88332A", "88332B", "88332R",\ + "88332G", "88332H", "88332K",\ + "88332V", "88332W", "88332X"] + printcodes(ebmcodes, moderna) + johnsonjohnson = ["88334", "88334R",\ + "88334I", "88334K",\ + "88334Y", "88334X"] + printcodes(ebmcodes, johnsonjohnson) + astrazeneca = ["88333A", "88333B", "88333R",\ + "88333G", "88333H", "88333K",\ + "88333V", "88333W", "88333X"] + printcodes(ebmcodes, astrazeneca) + novavax = ["88335A", "88335B",\ + "88335V", "88335W",\ + "88335G", "88335H", + "88335R", "88335K", + "88335X"] + printcodes(ebmcodes, novavax) + valneva = ["88336A", "88336B",\ + "88336V", "88336W",\ + "88336G", "88336H"] + printcodes(ebmcodes, valneva) + sanofi = ["88339A", "88339B",\ + "88339V", "88339W",\ + "88339G", "88339H", + "88339R", "88339K", + "88339X"] + printcodes(ebmcodes, valneva) + print("") + + printcodes(ebmcodes, ["88125"]) + print("") + + return 0 + +if __name__ == "__main__": + sys.exit(main(sys.argv)) +