summaryrefslogtreecommitdiff
path: root/AntennaPodDbFixer.py
blob: aff6eb61c9628618707e4ca6569a203c4feb43b0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/env python3
import subprocess
import json
import shutil
import os
import sys
import getopt

def usage():
    return sys.argv[0] + " [--help] [--verbose] [corrupt.db] [empty.db]"

def i_print(index, alternatives):
    if type(index) == bool:
        if index:
            index = 1
        else:
            index = 0
    args = alternatives[index]
    msg = args.pop("msg")
    print(msg, **args)

def get_db_version(fileName):
    return subprocess.run(
        ["sqlite3", fileName, "PRAGMA user_version;"],
        capture_output=True,
        text=True
    ).stdout.strip()

def integrity_is_ok(fileName):
    print("Checking integrity of " + fileName + ".", end="")
    integrityCheck = subprocess.run(
            ["sqlite3", fileName, "PRAGMA integrity_check;"],
            capture_output=True,
            text=True
        ).stdout.strip()
    is_ok = integrityCheck == "ok"
    if is_ok:
        print(" Detected no errors.")
    else:
        print(" Errors found.")
    if verbose:
        print(f"sqlite3 '{fileName}' 'PRAGMA integrity_check;'")
    if verbose:
        print(integrityCheck)
    return is_ok

# Insert relevant columns into a new database
def query(db, query):
    result = ""
    try:
        result = subprocess.run(
                ["sqlite3", "-init", "/tmp/conf", "-json", db, query],
                capture_output=True,
                check=True,
                text=True
            ).stdout
        return json.loads(result)
    except subprocess.CalledProcessError as err:
        print(err.stderr, file=sys.stderr)
        exit(1)
    except json.decoder.JSONDecodeError as err:
        return result

try:
    opts, args = getopt.gnu_getopt(sys.argv, "hv", ["help", "verbose"])
except getopt.GetoptError as err:
    print(usage(), file=sys.stderr)
    sys.exit(1)

verbose = False
for o, a in opts:
    if o in ("-v", "--verbose"):
        verbose = True
    elif o in ("-h", "--help"):
        print(usage())
        sys.exit()
    else:
        assert False, "Bug: option " + o + " not handled."

if len(args) >= 1:
    inputFilePath = args[1]
else:
    inputFilePath = input("Enter file path to your corrupted AntennaPod database: ")

DBPAGE_SYM = "ENABLE_DBPAGE_VTAB"
dbpage_ext = subprocess.run([
        "sh", "-c", "strings $( which sqlite3 ) | grep " + DBPAGE_SYM],
        capture_output=True,
        text=True
    ).stdout.strip()
if dbpage_ext != DBPAGE_SYM:
    print("Could not detect sqlite3 dbpage extension. Aborting.", file=sys.stderr)
    print("https://sqlite.org/dbpage.html (required by .recover)", file=sys.stderr)
    exit(1)
else:
    if verbose:
        print("It seems sqlite3 on this system has the dbpage extension. Good!")

corruptedVersion = get_db_version(inputFilePath)
if corruptedVersion == "0":
    print("Error: File not found, not a database, or too corrupted for this script.", file=sys.stderr)
    exit(1)
print("Corrupted file version: " + corruptedVersion)

integrity_is_ok(inputFilePath)

if len(args) >= 2:
    emptyFilePath = args[2]
else:
    emptyFilePath = "empty/" + corruptedVersion + ".db"
if not os.path.isfile(emptyFilePath):
    print("If needed, you can download old app versions on F-Droid and export an empty database.")
    emptyFilePath = input("Enter file path to an EMPTY AntennaPod database with the same version: ")
emptyVersion = get_db_version(emptyFilePath)
print("Empty file version: " + emptyVersion)
if corruptedVersion != emptyVersion:
    print("Error: Application version differs between database files.", file=sys.stderr)
    exit(1)
print()

repairedFilePath = inputFilePath + "-repaired.db"
sqlFilePath = inputFilePath + ".sql.tmp"
workingcopyFilePath = inputFilePath + ".tmp"

if not integrity_is_ok(emptyFilePath):
    print(f"Errors found in {emptyFilePath}. Giving up!", file=sys.stderr)
    exit(1)

if verbose:
    print(f"cp '{emptyFilePath}' '{repairedFilePath}'")
shutil.copyfile(emptyFilePath, repairedFilePath, follow_symlinks=True)
if os.path.exists(sqlFilePath): os.remove(sqlFilePath)
if os.path.exists(workingcopyFilePath): os.remove(workingcopyFilePath)

# Recover to SQL commands and insert back into a database
print("Recovering database.")
if verbose:
    print(f"sqlite3 '{inputFilePath}' '.recover --ignore-freelist' >'{sqlFilePath}'")
subprocess.run(
        ["sqlite3", inputFilePath, ".recover --ignore-freelist"],
        check=True,
        stdout=open(sqlFilePath, 'w')
    )
f = open(sqlFilePath,'r')
filedata = f.read()
f.close()
f = open(sqlFilePath,'w')
# Avoid a warning that could be confusing to users
f.write(filedata.replace("CREATE TABLE sqlite_sequence(name,seq);",""))
f.close()
if verbose:
    print(f"sqlite3 '{workingcopyFilePath}' <'{sqlFilePath}'")
subprocess.run(["sqlite3", workingcopyFilePath], stdin=open(sqlFilePath, 'r'))
print()


tables = query(emptyFilePath, "SELECT name FROM sqlite_schema WHERE type='table';")
for table in tables:
    table = table["name"]
    print("Copying " + table, end="", flush=True)
    sql = f"SELECT GROUP_CONCAT(NAME,',') AS columns FROM PRAGMA_TABLE_INFO('{table}')"
    i_print(verbose, [
            {"msg": ".", "end": "", "flush": True},
            {"msg": f"\nsqlite3 '{emptyFilePath}'\n{sql};"},
        ])
    columns = query(emptyFilePath, sql)[0]["columns"]

    sql ="DELETE FROM " + table
    i_print(verbose, [
            {"msg": ".", "end": "", "flush": True},
            {"msg": f"sqlite3 '{repairedFilePath}'\n{sql};"},
        ])
    query(repairedFilePath, sql)
    sql = (f"attach '{workingcopyFilePath}' AS old; INSERT INTO main.{table} "
        + f"SELECT {columns} from old.{table};")
    i_print(verbose, [
            {"msg": ".", "end": "", "flush": True},
            {"msg": f"{sql}"},
        ])
    query(repairedFilePath, sql)
    print()
print()

# Cleanup
if verbose:
    print(f"rm '{sqlFilePath}' '{workingcopyFilePath}'")
os.remove(sqlFilePath)
os.remove(workingcopyFilePath)

if not integrity_is_ok(repairedFilePath):
    print(f"Integrity check of {repairedFilePath} still found errors.",
        file=sys.stderr)
    exit(1)

print("Done. Output file: " + repairedFilePath)