]> xmof Git - DeDRM.git/commitdiff
added Readium LCP 2.5 support
authorAnon <anon@example.com>
Mon, 3 Feb 2025 00:48:56 +0000 (19:48 -0500)
committerxmoforf <xmoforf@disroot.org>
Sat, 4 Oct 2025 22:38:01 +0000 (18:38 -0400)
DeDRM_plugin/__init__.py
DeDRM_plugin/lcpdedrm.py

index bacea0316c1e81f292df81d8c3adcc7eb0c72a63..1d69c7607fc3aec7fa2829db81fd682026f57a5f 100644 (file)
@@ -155,7 +155,7 @@ class DeDRM(FileTypePlugin):
     version                 = PLUGIN_VERSION_TUPLE
     #minimum_calibre_version = (5, 0, 0)  # Python 3.
     minimum_calibre_version = (2, 0, 0)  # Needs Calibre 1.0 minimum. 1.X untested.
-    file_types              = set(['epub','pdf','pdb','prc','mobi','pobi','azw','azw1','azw3','azw4','azw8','tpz','kfx','kfx-zip'])
+    file_types              = set(['epub','pdf','pdb','prc','mobi','pobi','azw','azw1','azw3','azw4','azw8','tpz','kfx','kfx-zip', 'lcpdf'])
     on_import               = True
     on_preprocess           = True
     priority                = 600
@@ -1041,6 +1041,17 @@ class DeDRM(FileTypePlugin):
         elif booktype == 'epub':
             # Adobe Adept, PassHash (B&N) or LCP ePub
             decrypted_ebook = self.ePubDecrypt(path_to_ebook)
+        elif booktype == 'lcpdf':
+            import prefs
+            import lcpdedrm
+            dedrmprefs = prefs.DeDRM_Prefs()
+            if (lcpdedrm.isLCPbook(path_to_ebook)):
+                try: 
+                    retval = lcpdedrm.decryptLCPbook(path_to_ebook, dedrmprefs['lcp_passphrases'], self)
+                except:
+                    print("Looks like that didn't work:")
+                    raise
+                decrypted_ebook =  retval
         else:
             print("Unknown booktype {0}. Passing back to calibre unchanged".format(booktype))
             return path_to_ebook
index a7e848f7bcb5a5ea309913efe186919b5140cc92..bdc66609977392c8bb553254199272f40b8ce3ac 100644 (file)
 # Revision history:
 #   1 - Initial release
 #   2 - LCP DRM code removed due to a DMCA takedown.
+#   3 - LCP DRM code restored and updated.
 
 """
-This file used to contain code to remove the Readium LCP DRM
-from eBooks. Unfortunately, Readium has issued a DMCA takedown 
-request, so I was forced to remove that code: 
-
-https://github.com/github/dmca/blob/master/2022/01/2022-01-04-readium.md
-
-This file now just returns an error message when asked to remove LCP DRM.
-For more information, see this issue: 
-https://github.com/noDRM/DeDRM_tools/issues/18 
+Decrypt Readium LCP encrypted ePub and PDF books.
 """
 
 __license__ = 'GPL v3'
 __version__ = "2"
 
 import json
-from zipfile import ZipFile
+import hashlib
+import base64
+import zlib
+import binascii
+from zipfile import ZipInfo, ZipFile, ZIP_STORED, ZIP_DEFLATED
 from contextlib import closing
+from Crypto.Cipher import AES
+from lxml import etree
+
+# Wrap a stream so that output gets flushed immediately
+# and also make sure that any unicode strings get
+# encoded using "replace" before writing them.
+class SafeUnbuffered:
+    def __init__(self, stream):
+        self.stream = stream
+        self.encoding = stream.encoding
+        if self.encoding == None:
+            self.encoding = "utf-8"
+    def write(self, data):
+        if isinstance(data,str) or isinstance(data,unicode):
+            # str for Python3, unicode for Python2
+            data = data.encode(self.encoding,"replace")
+        try:
+            buffer = getattr(self.stream, 'buffer', self.stream)
+            # self.stream.buffer for Python3, self.stream for Python2
+            buffer.write(data)
+            buffer.flush()
+        except:
+            # We can do nothing if a write fails
+            raise
+    def __getattr__(self, attr):
+        return getattr(self.stream, attr)
+
+
+
+class Decryptor(object):
+    def __init__(self, bookkey, encryption):
+        enc = lambda tag: '{%s}%s' % ('http://www.w3.org/2001/04/xmlenc#', tag)
+        dsig = lambda tag: '{%s}%s' % ('http://www.w3.org/2000/09/xmldsig#', tag)
+        self.book_key = bookkey
+        self._encryption = None
+        if encryption is not None:
+            self._encryption = etree.fromstring(encryption)
+            # This loops through all entries in the "encryption.xml" file
+            # to figure out which files need to be decrypted.
+            # All encrypted file paths will be added to the "encrypted" list
+            self._encrypted = encrypted = set()
+            self._other = other = set()
 
+            self._json_elements_to_remove = json_elements_to_remove = set()
+            self._has_remaining_xml = False
+            expr = './%s/%s/%s' % (enc('EncryptedData'), enc('CipherData'),
+                                enc('CipherReference'))
+            for elem in self._encryption.findall(expr):
+                path = elem.get('URI', None)
+                encryption_type_url = (elem.getparent().getparent().find("./%s" % (enc('EncryptionMethod'))).get('Algorithm', None))
+                retrieval_method_url = None
+                if (encryption_type_url == "http://www.w3.org/2001/04/xmlenc#aes256-cbc"):
+                    try: 
+                        retrieval_method_url = (elem.getparent().getparent().find("./%s/%s" % (dsig('KeyInfo'), dsig('RetrievalMethod'))).get('Type', None))
+                    except:
+                        pass
+
+                if path is not None:
+                    if retrieval_method_url == "http://readium.org/2014/01/lcp#EncryptedContentKey":
+                        path = path.encode('utf-8')
+                        encrypted.add(path)
+                        if (self.book_key is None):
+                            self._has_remaining_xml = True
+                        else:
+                            json_elements_to_remove.add(elem.getparent().getparent())
+
+                    else: 
+                        path = path.encode('utf-8')
+                        other.add(path)
+                        self._has_remaining_xml = True
+                        # Other unsupported type.
+            
+            for elem in json_elements_to_remove:
+                elem.getparent().remove(elem)
+
+    def check_if_remaining(self):
+        return self._has_remaining_xml
+
+    def get_xml(self):
+        return "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + etree.tostring(self._encryption, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
+
+    def decompress(self, bytes):
+        dc = zlib.decompressobj(-15)
+        try:
+            decompressed_bytes = dc.decompress(bytes)
+            ex = dc.decompress(b'Z') + dc.flush()
+            if ex:
+                decompressed_bytes = decompressed_bytes + ex
+        except:
+            # possibly not compressed by zip - just return bytes
+            return bytes, False
+        return decompressed_bytes , True
+    
+    def decrypt(self, path, data):
+        if (len(data) > 0) and (self._encryption is None or path.encode('utf-8') in self._encrypted) and (self.book_key is not None):
+            aes = AES.new(self.book_key, AES.MODE_CBC, data[:16])
+            data = aes.decrypt(data[16:])
+            
+            # Fix padding
+            if type(data[-1]) != int:
+                place = ord(data[-1])
+            else:
+                place = data[-1]
+            data = data[:-place]
+            data, was_decomp = self.decompress(data)
+            return data
+
+        else: 
+            # Not encrypted or obfuscated
+            return data
 
 class LCPError(Exception):
     pass
 
+class LCPTransform: 
+
+    @staticmethod
+    def secret_transform_basic(input_hash):
+        # basic profile doesn't have any transformation
+        # Takes key input as hexdigest and outputs it as hexdigest
+        return input_hash
+
+    @staticmethod
+    def secret_transform_profile10(input_hash): 
+        # Takes an input sha256 hash as hexdigest and transforms that according to the profile-1.0 spec. 
+        # This 64-byte master key is basically all that distinguishes the open source "open for everyone" version
+        # from the so-called "open source" closed-source-version that's actually being used by book distributors.
+        # 64 byte master key = 64 iterations
+
+        # This function is what the documentation describes as "uk = userkey(h)", the "secret userkey transform"
+
+        # 1. Take input
+        # 2. Hash it
+        # 3. Add one byte from the master key to the end of the hash
+        # 4. Hash that result again
+        # 5. Go back to 3. until you run out of bytes. 
+        # 6. The result is the key.
+
+        masterkey = "b3a07c4d42880e69398e05392405050efeea0664c0b638b7c986556fa9b58d77b31a40eb6a4fdba1e4537229d9f779daad1cc41ee968153cb71f27dc9696d40f"
+        masterkey = bytearray.fromhex(masterkey)
+        try:
+            current_hash = bytearray.fromhex(input_hash)
+        except:
+            return None
+
+        for byte in masterkey:
+            current_hash.append(byte)
+            current_hash = bytearray(hashlib.sha256(current_hash).digest())
+        return binascii.hexlify(current_hash).decode("latin-1")
+
+    @staticmethod
+    def userpass_to_hash(passphrase, algorithm):
+        # Check for the password algorithm. The Readium LCP standard only defines SHA256.
+        # The hashing standard documents they link to define a couple other hash algorithms, too. 
+        # I've never seen them actually used in an LCP-encrypted file, so I didn't bother to implement them. 
+
+        if (algorithm == "http://www.w3.org/2001/04/xmlenc#sha256"):
+            algo = "SHA256"
+            user_password_hashed = hashlib.sha256(passphrase).hexdigest()
+            # This seems to be the only algorithm that's actually defined in the Readium standard.
+        else:
+            print("LCP: Book is using unsupported user key algorithm: {0}".format(algorithm))
+            return None, None
+
+        return algo, user_password_hashed
+
+
 # Check file to see if this is an LCP-protected file
 def isLCPbook(inpath):
     try: 
         with closing(ZipFile(open(inpath, 'rb'))) as lcpbook:
-            if ("META-INF/license.lcpl" not in lcpbook.namelist() or
-                "META-INF/encryption.xml" not in lcpbook.namelist() or
-                b"EncryptedContentKey" not in lcpbook.read("META-INF/encryption.xml")):
+            if ("META-INF/license.lcpl" not in lcpbook.namelist()):
                 return False
 
             license = json.loads(lcpbook.read('META-INF/license.lcpl'))
@@ -54,17 +211,323 @@ def isLCPbook(inpath):
     
     return False
 
+# This function decrypts data with the given key
+def dataDecryptLCP(b64data, hex_key):
+    # Decode base64 data
+    try: 
+        decoded_data = base64.b64decode(b64data)
+    except Exception as e:
+        raise ValueError(f"Invalid Base64 data: {e}")
+    if len(decoded_data) < 16:
+        raise ValueError("Decoded data is too short to contain a valid IV and ciphertext.")
+
+    # Extract IV and ciphertext
+    iv, cipher = decoded_data[:16], decoded_data[16:]
+
+    # Ensure IV is exactly 16 bytes
+    if len(iv) != 16:
+        raise ValueError(f"Incorrect IV length: {len(iv)} bytes (must be 16 bytes).")
+
+    # Convert hex key to bytes
+    key_bytes = binascii.unhexlify(hex_key)
+
+    # Perform AES-CBC decryption
+    aes = AES.new(key_bytes, AES.MODE_CBC, iv)
+    decrypted = aes.decrypt(cipher)
+
+    # Handle PKCS7 padding removal
+    padding = decrypted[-1] if isinstance(decrypted[-1], int) else ord(decrypted[-1])
+    if padding < 1 or padding > 16:
+        raise ValueError("Invalid padding detected.")
+    
+    return decrypted[:-padding]
+
+
+# This function just returns an info string about the license
+# Optional.
+def returnUserInfoStringForLicense(license, user_pass = None):
+    if not "user" in license:
+        return None
+
+    user_name = None
+    user_email = None
+
+    if "email" in license["user"]:
+        user_email = license["user"]["email"]
+    if "name" in license["user"]:
+        user_name = license["user"]["name"]
+
+    # Sometimes these are encrypted
+    if "encrypted" in license["user"] and "email" in license["user"]["encrypted"]:
+        if user_pass is None:
+            user_email = None
+        else:
+            # Decrypt
+            try: 
+                user_email_temp = dataDecryptLCP(user_email, user_pass)
+                user_email = str(user_email_temp.decode("utf-8"))
+            except:
+                # Decryption failed for whatever reason
+                user_email = None
+            
+    
+    if "encrypted" in license["user"] and "name" in license["user"]["encrypted"]:
+        if user_pass is None:
+            user_name = None
+        else:
+            # Decrypt
+            try: 
+                user_name_temp = dataDecryptLCP(user_name, user_pass)
+                user_name = str(user_name_temp.decode("utf-8"))
+            except:
+                # Decryption failed for whatever reason
+                user_name = None
+
+    if (user_name is None and user_email is None):
+        return None
+
+    print_str = ""
+
+    if ("id" in license["user"]):
+        print_str += "ID=" + license["user"]["id"] + ", "
+    
+    if (user_email is not None):
+        print_str += "Email=" + user_email + ", "
+
+    if (user_name is not None):
+        print_str += "Name=" + user_name + ", "
+
+    # Remove last comma
+    print_str = print_str[:-2]
+    return print_str
+
 
 # Takes a file and a list of passphrases
 def decryptLCPbook(inpath, passphrases, parent_object):
-
     if not isLCPbook(inpath):
         raise LCPError("This is not an LCP-encrypted book")
 
-    print("LCP: LCP DRM removal no longer supported due to a DMCA takedown request.")
-    print("LCP: The takedown request can be found here: ")
-    print("LCP: https://github.com/github/dmca/blob/master/2022/01/2022-01-04-readium.md ")
-    print("LCP: More information can be found in the Github repository: ")
-    print("LCP: https://github.com/noDRM/DeDRM_tools/issues/18 ")
+    file = ZipFile(open(inpath, 'rb'))
+
+    license = json.loads(file.read('META-INF/license.lcpl'))
+    print("LCP: Found LCP-encrypted book {0}".format(license["id"]))
+    
+    user_info_string1 = returnUserInfoStringForLicense(license, None)
+    if (user_info_string1 is not None):
+        print("LCP: Account information: " + user_info_string1)
+
+    # Check algorithm:
+    if license["encryption"]["profile"] == "http://readium.org/lcp/basic-profile":
+        print("LCP: Book is using lcp/basic-profile encryption.")
+        transform_algo = LCPTransform.secret_transform_basic
+    elif license["encryption"]["profile"] == "http://readium.org/lcp/profile-1.0":
+        print("LCP: Book is using lcp/profile-1.0 encryption")
+        transform_algo = LCPTransform.secret_transform_profile10
+    else: 
+        file.close()
+        raise LCPError("Book is using an unknown LCP encryption standard: {0}".format(license["encryption"]["profile"]))
+
+    if (
+        "algorithm" in license["encryption"]["content_key"] and 
+        license["encryption"]["content_key"]["algorithm"] != "http://www.w3.org/2001/04/xmlenc#aes256-cbc"
+        ):
+        file.close()
+        raise LCPError("Book is using an unknown LCP encryption algorithm: {0}".format(license["encryption"]["content_key"]["algorithm"]))
+
+    key_check = license["encryption"]["user_key"]["key_check"]
+    encrypted_content_key = license["encryption"]["content_key"]["encrypted_value"]
+
+    # Prepare a list of encryption keys to test:
+    password_hashes = []
+    
+    # Some providers hard-code the passphrase in the LCPL file. That doesn't happen often,
+    # but when it does, these files can be decrypted without knowing any passphrase.
+
+    if "value" in license["encryption"]["user_key"]:
+        try: 
+            password_hashes.append(binascii.hexlify(base64.decodebytes(license["encryption"]["user_key"]["value"].encode())).decode("ascii"))
+        except AttributeError:
+            # Python 2
+            password_hashes.append(binascii.hexlify(base64.decodestring(license["encryption"]["user_key"]["value"].encode())).decode("ascii"))
+    if "hex_value" in license["encryption"]["user_key"]:
+        password_hashes.append(binascii.hexlify(bytearray.fromhex(license["encryption"]["user_key"]["hex_value"])).decode("ascii"))
+
+    # Hash all the passwords provided by the user:
+    for possible_passphrase in passphrases:
+        password_hashes.append(possible_passphrase)
+        algo = "http://www.w3.org/2001/04/xmlenc#sha256"
+        if "algorithm" in license["encryption"]["user_key"]:
+            algo = license["encryption"]["user_key"]["algorithm"]
+
+        algo, tmp_pw = LCPTransform.userpass_to_hash(possible_passphrase.encode('utf-8'), algo)
+        if tmp_pw is not None: 
+            password_hashes.append(tmp_pw)
+
+    # For all the password hashes, check if one of them decrypts the book:
+    correct_password_hash = None
+
+    for possible_hash in password_hashes:
+        transformed_hash = possible_hash
+        print("trying {0}".format(transformed_hash))
+        try: 
+            decrypted = None
+            decrypted = dataDecryptLCP(key_check, transformed_hash)
+        except:
+            pass
+
+        if (decrypted is not None and decrypted.decode("ascii", errors="ignore") == license["id"]):
+            # Found correct password hash, hooray!
+            correct_password_hash = transformed_hash
+            break
+
+        transformed_hash = transform_algo(possible_hash)
+        print("trying {0}".format(transformed_hash))
+        try: 
+            decrypted = None
+            decrypted = dataDecryptLCP(key_check, transformed_hash)
+        except:
+            pass
+
+        if (decrypted is not None and decrypted.decode("ascii", errors="ignore") == license["id"]):
+            # Found correct password hash, hooray!
+            correct_password_hash = transformed_hash
+            break
+
+
+    # Print an error message if none of the passwords worked
+    if (correct_password_hash is None):
+        print("LCP: Tried {0} passphrases, but none of them could decrypt the book ...".format(len(password_hashes) / 2))
+        
+        # Print password hint, if available
+        if ("text_hint" in license["encryption"]["user_key"] and license["encryption"]["user_key"]["text_hint"] != ""):
+            print("LCP: The book distributor has given you the following passphrase hint: \"{0}\"".format(license["encryption"]["user_key"]["text_hint"]))
+
+        print("LCP: Enter the correct passphrase in the DeDRM plugin settings, then try again.")
+        
+        # Print password reset instructions, if available
+        for link in license["links"]:
+            if ("rel" in link and link["rel"] == "hint"):
+                print("LCP: You may be able to find or reset your LCP passphrase on the following webpage: {0}".format(link["href"]))
+                break
+
+        
+        file.close()
+        raise LCPError("No correct passphrase found")
+
+    print("LCP: Found correct passphrase, decrypting book ...")
+    user_info_string2 = returnUserInfoStringForLicense(license, correct_password_hash)
+    if (user_info_string2 is not None):
+        if (user_info_string1 != user_info_string2):
+            print("LCP: Account information: " + user_info_string2)
+
+
+    # Take the key we found and decrypt the content key:
+    decrypted_content_key = dataDecryptLCP(encrypted_content_key, correct_password_hash)
+
+    if decrypted_content_key is None:
+        raise LCPError("Decrypted content key is None")
+
+    # Begin decrypting
+
+    if 'META-INF/encryption.xml' in file.namelist():
+        encryption = file.read('META-INF/encryption.xml')
+    else:
+        encryption = None
+    decryptor = Decryptor(decrypted_content_key, encryption)
+    kwds = dict(compression=ZIP_DEFLATED, allowZip64=False)
+    for link in license.get("links", []):
+        if link.get("rel") == "publication":
+            content_type = link.get("type")
+            break
+    
+    if content_type in ["application/pdf+lcp", "application/pdf"]:
+        # Check how many PDF files there are. 
+        # Usually, an LCP-protected PDF/ZIP is only supposed to contain one 
+        # PDF file, but if there are multiple, return a ZIP that contains them all.
+
+        pdf_files = []
+        for filename in file.namelist():
+            if filename.endswith(".pdf"):
+                pdf_files.append(filename)
+
+        if len(pdf_files) == 0:
+            file.close()
+            raise LCPError("Error: Book is an LCP-protected PDF, but doesn't contain any PDF files ...")
+        
+        elif len(pdf_files) == 1:
+            # One PDF file found - extract and return that.
+            pdfdata = file.read(pdf_files[0])
+            outputname = parent_object.temporary_file(".pdf").name
+            print("LCP: Successfully decrypted, exporting to {0}".format(outputname))
+
+            with open(outputname, 'wb') as f:
+                f.write(decryptor.decrypt(pdf_files[0], pdfdata))
+            
+            file.close()
+            return outputname
+                
+        else:
+            # Multiple PDFs found
+            outputname = parent_object.temporary_file(".zip").name
+            with closing(ZipFile(open(outputname, 'wb'), 'w', **kwds)) as outfile:
+                for path in pdf_files:
+                    data = file.read(path)
+                    outfile.writestr(path, decryptor.decrypt(path, data))
+
+            print("LCP: Successfully decrypted a multi-PDF ZIP file, exporting to {0}".format(outputname))
+            file.close()
+            return outputname
+
+    else:
+        # Not a PDF -> EPUB
+
+        if content_type == "application/epub+zip":
+            outputname = parent_object.temporary_file(".epub").name
+        else:
+            outputname = parent_object.temporary_file(".zip").name
+
+        with closing(ZipFile(open(outputname, 'wb'), 'w', **kwds)) as outfile:
+
+            # mimetype must be 1st file. Remove from list and manually add at the beginning
+            namelist = file.namelist()
+            namelist.remove("mimetype")
+            namelist.remove("META-INF/license.lcpl")
+
+            for path in (["mimetype"] + namelist):
+                data = file.read(path)
+                zi = ZipInfo(path)
+
+                if path == "META-INF/encryption.xml":
+                    # Check if that's still needed
+                    if (decryptor.check_if_remaining()):
+                        data = decryptor.get_xml()
+                        print("LCP: Adding encryption.xml for the remaining files.")
+                    else:
+                        continue
+                
+                try:
+                    oldzi = file.getinfo(path)
+                    if path == "mimetype":
+                        zi.compress_type = ZIP_STORED
+                    else:
+                        zi.compress_type = ZIP_DEFLATED
+                    zi.date_time = oldzi.date_time
+                    zi.comment = oldzi.comment
+                    zi.extra = oldzi.extra
+                    zi.internal_attr = oldzi.internal_attr
+                    zi.external_attr = oldzi.external_attr
+                    zi.create_system = oldzi.create_system
+                    if any(ord(c) >= 128 for c in path) or any(ord(c) >= 128 for c in zi.comment):
+                        # If the file name or the comment contains any non-ASCII char, set the UTF8-flag
+                        zi.flag_bits |= 0x800
+                except:
+                    pass
 
-    raise LCPError("LCP DRM removal no longer supported")
+                if path == "META-INF/encryption.xml":
+                    outfile.writestr(zi, data)
+                else:
+                    outfile.writestr(zi, decryptor.decrypt(path, data))
+        
+        print("LCP: Successfully decrypted, exporting to {0}".format(outputname))
+        file.close()
+        return outputname
\ No newline at end of file