From 801a10ae653c03d4a50744e956827a65f8f50eb4 Mon Sep 17 00:00:00 2001 From: Anon Date: Sun, 2 Feb 2025 19:48:56 -0500 Subject: [PATCH] added Readium LCP 2.5 support --- DeDRM_plugin/__init__.py | 13 +- DeDRM_plugin/lcpdedrm.py | 503 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 495 insertions(+), 21 deletions(-) diff --git a/DeDRM_plugin/__init__.py b/DeDRM_plugin/__init__.py index 8200122..aeec048 100644 --- a/DeDRM_plugin/__init__.py +++ b/DeDRM_plugin/__init__.py @@ -155,7 +155,7 @@ class DeDRM(FileTypePlugin): version = PLUGIN_VERSION_TUPLE #minimum_calibre_version = (5, 0, 0) # Python 3. minimum_calibre_version = (2, 0, 0) # Needs Calibre 1.0 minimum. 1.X untested. - file_types = set(['epub','pdf','pdb','prc','mobi','pobi','azw','azw1','azw3','azw4','azw8','tpz','kfx','kfx-zip']) + file_types = set(['epub','pdf','pdb','prc','mobi','pobi','azw','azw1','azw3','azw4','azw8','tpz','kfx','kfx-zip', 'lcpdf']) on_import = True on_preprocess = True priority = 600 @@ -1041,6 +1041,17 @@ class DeDRM(FileTypePlugin): elif booktype == 'epub': # Adobe Adept, PassHash (B&N) or LCP ePub decrypted_ebook = self.ePubDecrypt(path_to_ebook) + elif booktype == 'lcpdf': + import prefs + import lcpdedrm + dedrmprefs = prefs.DeDRM_Prefs() + if (lcpdedrm.isLCPbook(path_to_ebook)): + try: + retval = lcpdedrm.decryptLCPbook(path_to_ebook, dedrmprefs['lcp_passphrases'], self) + except: + print("Looks like that didn't work:") + raise + decrypted_ebook = retval else: print("Unknown booktype {0}. Passing back to calibre unchanged".format(booktype)) return path_to_ebook diff --git a/DeDRM_plugin/lcpdedrm.py b/DeDRM_plugin/lcpdedrm.py index a7e848f..bdc6660 100644 --- a/DeDRM_plugin/lcpdedrm.py +++ b/DeDRM_plugin/lcpdedrm.py @@ -11,37 +11,194 @@ # Revision history: # 1 - Initial release # 2 - LCP DRM code removed due to a DMCA takedown. +# 3 - LCP DRM code restored and updated. """ -This file used to contain code to remove the Readium LCP DRM -from eBooks. Unfortunately, Readium has issued a DMCA takedown -request, so I was forced to remove that code: - -https://github.com/github/dmca/blob/master/2022/01/2022-01-04-readium.md - -This file now just returns an error message when asked to remove LCP DRM. -For more information, see this issue: -https://github.com/noDRM/DeDRM_tools/issues/18 +Decrypt Readium LCP encrypted ePub and PDF books. """ __license__ = 'GPL v3' __version__ = "2" import json -from zipfile import ZipFile +import hashlib +import base64 +import zlib +import binascii +from zipfile import ZipInfo, ZipFile, ZIP_STORED, ZIP_DEFLATED from contextlib import closing +from Crypto.Cipher import AES +from lxml import etree + +# Wrap a stream so that output gets flushed immediately +# and also make sure that any unicode strings get +# encoded using "replace" before writing them. +class SafeUnbuffered: + def __init__(self, stream): + self.stream = stream + self.encoding = stream.encoding + if self.encoding == None: + self.encoding = "utf-8" + def write(self, data): + if isinstance(data,str) or isinstance(data,unicode): + # str for Python3, unicode for Python2 + data = data.encode(self.encoding,"replace") + try: + buffer = getattr(self.stream, 'buffer', self.stream) + # self.stream.buffer for Python3, self.stream for Python2 + buffer.write(data) + buffer.flush() + except: + # We can do nothing if a write fails + raise + def __getattr__(self, attr): + return getattr(self.stream, attr) + + + +class Decryptor(object): + def __init__(self, bookkey, encryption): + enc = lambda tag: '{%s}%s' % ('http://www.w3.org/2001/04/xmlenc#', tag) + dsig = lambda tag: '{%s}%s' % ('http://www.w3.org/2000/09/xmldsig#', tag) + self.book_key = bookkey + self._encryption = None + if encryption is not None: + self._encryption = etree.fromstring(encryption) + # This loops through all entries in the "encryption.xml" file + # to figure out which files need to be decrypted. + # All encrypted file paths will be added to the "encrypted" list + self._encrypted = encrypted = set() + self._other = other = set() + self._json_elements_to_remove = json_elements_to_remove = set() + self._has_remaining_xml = False + expr = './%s/%s/%s' % (enc('EncryptedData'), enc('CipherData'), + enc('CipherReference')) + for elem in self._encryption.findall(expr): + path = elem.get('URI', None) + encryption_type_url = (elem.getparent().getparent().find("./%s" % (enc('EncryptionMethod'))).get('Algorithm', None)) + retrieval_method_url = None + if (encryption_type_url == "http://www.w3.org/2001/04/xmlenc#aes256-cbc"): + try: + retrieval_method_url = (elem.getparent().getparent().find("./%s/%s" % (dsig('KeyInfo'), dsig('RetrievalMethod'))).get('Type', None)) + except: + pass + + if path is not None: + if retrieval_method_url == "http://readium.org/2014/01/lcp#EncryptedContentKey": + path = path.encode('utf-8') + encrypted.add(path) + if (self.book_key is None): + self._has_remaining_xml = True + else: + json_elements_to_remove.add(elem.getparent().getparent()) + + else: + path = path.encode('utf-8') + other.add(path) + self._has_remaining_xml = True + # Other unsupported type. + + for elem in json_elements_to_remove: + elem.getparent().remove(elem) + + def check_if_remaining(self): + return self._has_remaining_xml + + def get_xml(self): + return "\n" + etree.tostring(self._encryption, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") + + def decompress(self, bytes): + dc = zlib.decompressobj(-15) + try: + decompressed_bytes = dc.decompress(bytes) + ex = dc.decompress(b'Z') + dc.flush() + if ex: + decompressed_bytes = decompressed_bytes + ex + except: + # possibly not compressed by zip - just return bytes + return bytes, False + return decompressed_bytes , True + + def decrypt(self, path, data): + if (len(data) > 0) and (self._encryption is None or path.encode('utf-8') in self._encrypted) and (self.book_key is not None): + aes = AES.new(self.book_key, AES.MODE_CBC, data[:16]) + data = aes.decrypt(data[16:]) + + # Fix padding + if type(data[-1]) != int: + place = ord(data[-1]) + else: + place = data[-1] + data = data[:-place] + data, was_decomp = self.decompress(data) + return data + + else: + # Not encrypted or obfuscated + return data class LCPError(Exception): pass +class LCPTransform: + + @staticmethod + def secret_transform_basic(input_hash): + # basic profile doesn't have any transformation + # Takes key input as hexdigest and outputs it as hexdigest + return input_hash + + @staticmethod + def secret_transform_profile10(input_hash): + # Takes an input sha256 hash as hexdigest and transforms that according to the profile-1.0 spec. + # This 64-byte master key is basically all that distinguishes the open source "open for everyone" version + # from the so-called "open source" closed-source-version that's actually being used by book distributors. + # 64 byte master key = 64 iterations + + # This function is what the documentation describes as "uk = userkey(h)", the "secret userkey transform" + + # 1. Take input + # 2. Hash it + # 3. Add one byte from the master key to the end of the hash + # 4. Hash that result again + # 5. Go back to 3. until you run out of bytes. + # 6. The result is the key. + + masterkey = "b3a07c4d42880e69398e05392405050efeea0664c0b638b7c986556fa9b58d77b31a40eb6a4fdba1e4537229d9f779daad1cc41ee968153cb71f27dc9696d40f" + masterkey = bytearray.fromhex(masterkey) + try: + current_hash = bytearray.fromhex(input_hash) + except: + return None + + for byte in masterkey: + current_hash.append(byte) + current_hash = bytearray(hashlib.sha256(current_hash).digest()) + return binascii.hexlify(current_hash).decode("latin-1") + + @staticmethod + def userpass_to_hash(passphrase, algorithm): + # Check for the password algorithm. The Readium LCP standard only defines SHA256. + # The hashing standard documents they link to define a couple other hash algorithms, too. + # I've never seen them actually used in an LCP-encrypted file, so I didn't bother to implement them. + + if (algorithm == "http://www.w3.org/2001/04/xmlenc#sha256"): + algo = "SHA256" + user_password_hashed = hashlib.sha256(passphrase).hexdigest() + # This seems to be the only algorithm that's actually defined in the Readium standard. + else: + print("LCP: Book is using unsupported user key algorithm: {0}".format(algorithm)) + return None, None + + return algo, user_password_hashed + + # Check file to see if this is an LCP-protected file def isLCPbook(inpath): try: with closing(ZipFile(open(inpath, 'rb'))) as lcpbook: - if ("META-INF/license.lcpl" not in lcpbook.namelist() or - "META-INF/encryption.xml" not in lcpbook.namelist() or - b"EncryptedContentKey" not in lcpbook.read("META-INF/encryption.xml")): + if ("META-INF/license.lcpl" not in lcpbook.namelist()): return False license = json.loads(lcpbook.read('META-INF/license.lcpl')) @@ -54,17 +211,323 @@ def isLCPbook(inpath): return False +# This function decrypts data with the given key +def dataDecryptLCP(b64data, hex_key): + # Decode base64 data + try: + decoded_data = base64.b64decode(b64data) + except Exception as e: + raise ValueError(f"Invalid Base64 data: {e}") + if len(decoded_data) < 16: + raise ValueError("Decoded data is too short to contain a valid IV and ciphertext.") + + # Extract IV and ciphertext + iv, cipher = decoded_data[:16], decoded_data[16:] + + # Ensure IV is exactly 16 bytes + if len(iv) != 16: + raise ValueError(f"Incorrect IV length: {len(iv)} bytes (must be 16 bytes).") + + # Convert hex key to bytes + key_bytes = binascii.unhexlify(hex_key) + + # Perform AES-CBC decryption + aes = AES.new(key_bytes, AES.MODE_CBC, iv) + decrypted = aes.decrypt(cipher) + + # Handle PKCS7 padding removal + padding = decrypted[-1] if isinstance(decrypted[-1], int) else ord(decrypted[-1]) + if padding < 1 or padding > 16: + raise ValueError("Invalid padding detected.") + + return decrypted[:-padding] + + +# This function just returns an info string about the license +# Optional. +def returnUserInfoStringForLicense(license, user_pass = None): + if not "user" in license: + return None + + user_name = None + user_email = None + + if "email" in license["user"]: + user_email = license["user"]["email"] + if "name" in license["user"]: + user_name = license["user"]["name"] + + # Sometimes these are encrypted + if "encrypted" in license["user"] and "email" in license["user"]["encrypted"]: + if user_pass is None: + user_email = None + else: + # Decrypt + try: + user_email_temp = dataDecryptLCP(user_email, user_pass) + user_email = str(user_email_temp.decode("utf-8")) + except: + # Decryption failed for whatever reason + user_email = None + + + if "encrypted" in license["user"] and "name" in license["user"]["encrypted"]: + if user_pass is None: + user_name = None + else: + # Decrypt + try: + user_name_temp = dataDecryptLCP(user_name, user_pass) + user_name = str(user_name_temp.decode("utf-8")) + except: + # Decryption failed for whatever reason + user_name = None + + if (user_name is None and user_email is None): + return None + + print_str = "" + + if ("id" in license["user"]): + print_str += "ID=" + license["user"]["id"] + ", " + + if (user_email is not None): + print_str += "Email=" + user_email + ", " + + if (user_name is not None): + print_str += "Name=" + user_name + ", " + + # Remove last comma + print_str = print_str[:-2] + return print_str + # Takes a file and a list of passphrases def decryptLCPbook(inpath, passphrases, parent_object): - if not isLCPbook(inpath): raise LCPError("This is not an LCP-encrypted book") - print("LCP: LCP DRM removal no longer supported due to a DMCA takedown request.") - print("LCP: The takedown request can be found here: ") - print("LCP: https://github.com/github/dmca/blob/master/2022/01/2022-01-04-readium.md ") - print("LCP: More information can be found in the Github repository: ") - print("LCP: https://github.com/noDRM/DeDRM_tools/issues/18 ") + file = ZipFile(open(inpath, 'rb')) + + license = json.loads(file.read('META-INF/license.lcpl')) + print("LCP: Found LCP-encrypted book {0}".format(license["id"])) + + user_info_string1 = returnUserInfoStringForLicense(license, None) + if (user_info_string1 is not None): + print("LCP: Account information: " + user_info_string1) + + # Check algorithm: + if license["encryption"]["profile"] == "http://readium.org/lcp/basic-profile": + print("LCP: Book is using lcp/basic-profile encryption.") + transform_algo = LCPTransform.secret_transform_basic + elif license["encryption"]["profile"] == "http://readium.org/lcp/profile-1.0": + print("LCP: Book is using lcp/profile-1.0 encryption") + transform_algo = LCPTransform.secret_transform_profile10 + else: + file.close() + raise LCPError("Book is using an unknown LCP encryption standard: {0}".format(license["encryption"]["profile"])) + + if ( + "algorithm" in license["encryption"]["content_key"] and + license["encryption"]["content_key"]["algorithm"] != "http://www.w3.org/2001/04/xmlenc#aes256-cbc" + ): + file.close() + raise LCPError("Book is using an unknown LCP encryption algorithm: {0}".format(license["encryption"]["content_key"]["algorithm"])) + + key_check = license["encryption"]["user_key"]["key_check"] + encrypted_content_key = license["encryption"]["content_key"]["encrypted_value"] + + # Prepare a list of encryption keys to test: + password_hashes = [] + + # Some providers hard-code the passphrase in the LCPL file. That doesn't happen often, + # but when it does, these files can be decrypted without knowing any passphrase. + + if "value" in license["encryption"]["user_key"]: + try: + password_hashes.append(binascii.hexlify(base64.decodebytes(license["encryption"]["user_key"]["value"].encode())).decode("ascii")) + except AttributeError: + # Python 2 + password_hashes.append(binascii.hexlify(base64.decodestring(license["encryption"]["user_key"]["value"].encode())).decode("ascii")) + if "hex_value" in license["encryption"]["user_key"]: + password_hashes.append(binascii.hexlify(bytearray.fromhex(license["encryption"]["user_key"]["hex_value"])).decode("ascii")) + + # Hash all the passwords provided by the user: + for possible_passphrase in passphrases: + password_hashes.append(possible_passphrase) + algo = "http://www.w3.org/2001/04/xmlenc#sha256" + if "algorithm" in license["encryption"]["user_key"]: + algo = license["encryption"]["user_key"]["algorithm"] + + algo, tmp_pw = LCPTransform.userpass_to_hash(possible_passphrase.encode('utf-8'), algo) + if tmp_pw is not None: + password_hashes.append(tmp_pw) + + # For all the password hashes, check if one of them decrypts the book: + correct_password_hash = None + + for possible_hash in password_hashes: + transformed_hash = possible_hash + print("trying {0}".format(transformed_hash)) + try: + decrypted = None + decrypted = dataDecryptLCP(key_check, transformed_hash) + except: + pass + + if (decrypted is not None and decrypted.decode("ascii", errors="ignore") == license["id"]): + # Found correct password hash, hooray! + correct_password_hash = transformed_hash + break + + transformed_hash = transform_algo(possible_hash) + print("trying {0}".format(transformed_hash)) + try: + decrypted = None + decrypted = dataDecryptLCP(key_check, transformed_hash) + except: + pass + + if (decrypted is not None and decrypted.decode("ascii", errors="ignore") == license["id"]): + # Found correct password hash, hooray! + correct_password_hash = transformed_hash + break + + + # Print an error message if none of the passwords worked + if (correct_password_hash is None): + print("LCP: Tried {0} passphrases, but none of them could decrypt the book ...".format(len(password_hashes) / 2)) + + # Print password hint, if available + if ("text_hint" in license["encryption"]["user_key"] and license["encryption"]["user_key"]["text_hint"] != ""): + print("LCP: The book distributor has given you the following passphrase hint: \"{0}\"".format(license["encryption"]["user_key"]["text_hint"])) + + print("LCP: Enter the correct passphrase in the DeDRM plugin settings, then try again.") + + # Print password reset instructions, if available + for link in license["links"]: + if ("rel" in link and link["rel"] == "hint"): + print("LCP: You may be able to find or reset your LCP passphrase on the following webpage: {0}".format(link["href"])) + break + + + file.close() + raise LCPError("No correct passphrase found") + + print("LCP: Found correct passphrase, decrypting book ...") + user_info_string2 = returnUserInfoStringForLicense(license, correct_password_hash) + if (user_info_string2 is not None): + if (user_info_string1 != user_info_string2): + print("LCP: Account information: " + user_info_string2) + + + # Take the key we found and decrypt the content key: + decrypted_content_key = dataDecryptLCP(encrypted_content_key, correct_password_hash) + + if decrypted_content_key is None: + raise LCPError("Decrypted content key is None") + + # Begin decrypting + + if 'META-INF/encryption.xml' in file.namelist(): + encryption = file.read('META-INF/encryption.xml') + else: + encryption = None + decryptor = Decryptor(decrypted_content_key, encryption) + kwds = dict(compression=ZIP_DEFLATED, allowZip64=False) + for link in license.get("links", []): + if link.get("rel") == "publication": + content_type = link.get("type") + break + + if content_type in ["application/pdf+lcp", "application/pdf"]: + # Check how many PDF files there are. + # Usually, an LCP-protected PDF/ZIP is only supposed to contain one + # PDF file, but if there are multiple, return a ZIP that contains them all. + + pdf_files = [] + for filename in file.namelist(): + if filename.endswith(".pdf"): + pdf_files.append(filename) + + if len(pdf_files) == 0: + file.close() + raise LCPError("Error: Book is an LCP-protected PDF, but doesn't contain any PDF files ...") + + elif len(pdf_files) == 1: + # One PDF file found - extract and return that. + pdfdata = file.read(pdf_files[0]) + outputname = parent_object.temporary_file(".pdf").name + print("LCP: Successfully decrypted, exporting to {0}".format(outputname)) + + with open(outputname, 'wb') as f: + f.write(decryptor.decrypt(pdf_files[0], pdfdata)) + + file.close() + return outputname + + else: + # Multiple PDFs found + outputname = parent_object.temporary_file(".zip").name + with closing(ZipFile(open(outputname, 'wb'), 'w', **kwds)) as outfile: + for path in pdf_files: + data = file.read(path) + outfile.writestr(path, decryptor.decrypt(path, data)) + + print("LCP: Successfully decrypted a multi-PDF ZIP file, exporting to {0}".format(outputname)) + file.close() + return outputname + + else: + # Not a PDF -> EPUB + + if content_type == "application/epub+zip": + outputname = parent_object.temporary_file(".epub").name + else: + outputname = parent_object.temporary_file(".zip").name + + with closing(ZipFile(open(outputname, 'wb'), 'w', **kwds)) as outfile: + + # mimetype must be 1st file. Remove from list and manually add at the beginning + namelist = file.namelist() + namelist.remove("mimetype") + namelist.remove("META-INF/license.lcpl") + + for path in (["mimetype"] + namelist): + data = file.read(path) + zi = ZipInfo(path) + + if path == "META-INF/encryption.xml": + # Check if that's still needed + if (decryptor.check_if_remaining()): + data = decryptor.get_xml() + print("LCP: Adding encryption.xml for the remaining files.") + else: + continue + + try: + oldzi = file.getinfo(path) + if path == "mimetype": + zi.compress_type = ZIP_STORED + else: + zi.compress_type = ZIP_DEFLATED + zi.date_time = oldzi.date_time + zi.comment = oldzi.comment + zi.extra = oldzi.extra + zi.internal_attr = oldzi.internal_attr + zi.external_attr = oldzi.external_attr + zi.create_system = oldzi.create_system + if any(ord(c) >= 128 for c in path) or any(ord(c) >= 128 for c in zi.comment): + # If the file name or the comment contains any non-ASCII char, set the UTF8-flag + zi.flag_bits |= 0x800 + except: + pass - raise LCPError("LCP DRM removal no longer supported") + if path == "META-INF/encryption.xml": + outfile.writestr(zi, data) + else: + outfile.writestr(zi, decryptor.decrypt(path, data)) + + print("LCP: Successfully decrypted, exporting to {0}".format(outputname)) + file.close() + return outputname \ No newline at end of file -- 2.49.1