# Revision history:
# 1 - Initial release
# 2 - LCP DRM code removed due to a DMCA takedown.
+# 3 - LCP DRM code restored and updated.
"""
-This file used to contain code to remove the Readium LCP DRM
-from eBooks. Unfortunately, Readium has issued a DMCA takedown
-request, so I was forced to remove that code:
-
-https://github.com/github/dmca/blob/master/2022/01/2022-01-04-readium.md
-
-This file now just returns an error message when asked to remove LCP DRM.
-For more information, see this issue:
-https://github.com/noDRM/DeDRM_tools/issues/18
+Decrypt Readium LCP encrypted ePub and PDF books.
"""
__license__ = 'GPL v3'
__version__ = "2"
import json
-from zipfile import ZipFile
+import hashlib
+import base64
+import zlib
+import binascii
+from zipfile import ZipInfo, ZipFile, ZIP_STORED, ZIP_DEFLATED
from contextlib import closing
+from Crypto.Cipher import AES
+from lxml import etree
+
+# Wrap a stream so that output gets flushed immediately
+# and also make sure that any unicode strings get
+# encoded using "replace" before writing them.
+class SafeUnbuffered:
+ def __init__(self, stream):
+ self.stream = stream
+ self.encoding = stream.encoding
+ if self.encoding == None:
+ self.encoding = "utf-8"
+ def write(self, data):
+ if isinstance(data,str) or isinstance(data,unicode):
+ # str for Python3, unicode for Python2
+ data = data.encode(self.encoding,"replace")
+ try:
+ buffer = getattr(self.stream, 'buffer', self.stream)
+ # self.stream.buffer for Python3, self.stream for Python2
+ buffer.write(data)
+ buffer.flush()
+ except:
+ # We can do nothing if a write fails
+ raise
+ def __getattr__(self, attr):
+ return getattr(self.stream, attr)
+
+
+
+class Decryptor(object):
+ def __init__(self, bookkey, encryption):
+ enc = lambda tag: '{%s}%s' % ('http://www.w3.org/2001/04/xmlenc#', tag)
+ dsig = lambda tag: '{%s}%s' % ('http://www.w3.org/2000/09/xmldsig#', tag)
+ self.book_key = bookkey
+ self._encryption = None
+ if encryption is not None:
+ self._encryption = etree.fromstring(encryption)
+ # This loops through all entries in the "encryption.xml" file
+ # to figure out which files need to be decrypted.
+ # All encrypted file paths will be added to the "encrypted" list
+ self._encrypted = encrypted = set()
+ self._other = other = set()
+ self._json_elements_to_remove = json_elements_to_remove = set()
+ self._has_remaining_xml = False
+ expr = './%s/%s/%s' % (enc('EncryptedData'), enc('CipherData'),
+ enc('CipherReference'))
+ for elem in self._encryption.findall(expr):
+ path = elem.get('URI', None)
+ encryption_type_url = (elem.getparent().getparent().find("./%s" % (enc('EncryptionMethod'))).get('Algorithm', None))
+ retrieval_method_url = None
+ if (encryption_type_url == "http://www.w3.org/2001/04/xmlenc#aes256-cbc"):
+ try:
+ retrieval_method_url = (elem.getparent().getparent().find("./%s/%s" % (dsig('KeyInfo'), dsig('RetrievalMethod'))).get('Type', None))
+ except:
+ pass
+
+ if path is not None:
+ if retrieval_method_url == "http://readium.org/2014/01/lcp#EncryptedContentKey":
+ path = path.encode('utf-8')
+ encrypted.add(path)
+ if (self.book_key is None):
+ self._has_remaining_xml = True
+ else:
+ json_elements_to_remove.add(elem.getparent().getparent())
+
+ else:
+ path = path.encode('utf-8')
+ other.add(path)
+ self._has_remaining_xml = True
+ # Other unsupported type.
+
+ for elem in json_elements_to_remove:
+ elem.getparent().remove(elem)
+
+ def check_if_remaining(self):
+ return self._has_remaining_xml
+
+ def get_xml(self):
+ return "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + etree.tostring(self._encryption, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
+
+ def decompress(self, bytes):
+ dc = zlib.decompressobj(-15)
+ try:
+ decompressed_bytes = dc.decompress(bytes)
+ ex = dc.decompress(b'Z') + dc.flush()
+ if ex:
+ decompressed_bytes = decompressed_bytes + ex
+ except:
+ # possibly not compressed by zip - just return bytes
+ return bytes, False
+ return decompressed_bytes , True
+
+ def decrypt(self, path, data):
+ if (len(data) > 0) and (self._encryption is None or path.encode('utf-8') in self._encrypted) and (self.book_key is not None):
+ aes = AES.new(self.book_key, AES.MODE_CBC, data[:16])
+ data = aes.decrypt(data[16:])
+
+ # Fix padding
+ if type(data[-1]) != int:
+ place = ord(data[-1])
+ else:
+ place = data[-1]
+ data = data[:-place]
+ data, was_decomp = self.decompress(data)
+ return data
+
+ else:
+ # Not encrypted or obfuscated
+ return data
class LCPError(Exception):
pass
+class LCPTransform:
+
+ @staticmethod
+ def secret_transform_basic(input_hash):
+ # basic profile doesn't have any transformation
+ # Takes key input as hexdigest and outputs it as hexdigest
+ return input_hash
+
+ @staticmethod
+ def secret_transform_profile10(input_hash):
+ # Takes an input sha256 hash as hexdigest and transforms that according to the profile-1.0 spec.
+ # This 64-byte master key is basically all that distinguishes the open source "open for everyone" version
+ # from the so-called "open source" closed-source-version that's actually being used by book distributors.
+ # 64 byte master key = 64 iterations
+
+ # This function is what the documentation describes as "uk = userkey(h)", the "secret userkey transform"
+
+ # 1. Take input
+ # 2. Hash it
+ # 3. Add one byte from the master key to the end of the hash
+ # 4. Hash that result again
+ # 5. Go back to 3. until you run out of bytes.
+ # 6. The result is the key.
+
+ masterkey = "b3a07c4d42880e69398e05392405050efeea0664c0b638b7c986556fa9b58d77b31a40eb6a4fdba1e4537229d9f779daad1cc41ee968153cb71f27dc9696d40f"
+ masterkey = bytearray.fromhex(masterkey)
+ try:
+ current_hash = bytearray.fromhex(input_hash)
+ except:
+ return None
+
+ for byte in masterkey:
+ current_hash.append(byte)
+ current_hash = bytearray(hashlib.sha256(current_hash).digest())
+ return binascii.hexlify(current_hash).decode("latin-1")
+
+ @staticmethod
+ def userpass_to_hash(passphrase, algorithm):
+ # Check for the password algorithm. The Readium LCP standard only defines SHA256.
+ # The hashing standard documents they link to define a couple other hash algorithms, too.
+ # I've never seen them actually used in an LCP-encrypted file, so I didn't bother to implement them.
+
+ if (algorithm == "http://www.w3.org/2001/04/xmlenc#sha256"):
+ algo = "SHA256"
+ user_password_hashed = hashlib.sha256(passphrase).hexdigest()
+ # This seems to be the only algorithm that's actually defined in the Readium standard.
+ else:
+ print("LCP: Book is using unsupported user key algorithm: {0}".format(algorithm))
+ return None, None
+
+ return algo, user_password_hashed
+
+
# Check file to see if this is an LCP-protected file
def isLCPbook(inpath):
try:
with closing(ZipFile(open(inpath, 'rb'))) as lcpbook:
- if ("META-INF/license.lcpl" not in lcpbook.namelist() or
- "META-INF/encryption.xml" not in lcpbook.namelist() or
- b"EncryptedContentKey" not in lcpbook.read("META-INF/encryption.xml")):
+ if ("META-INF/license.lcpl" not in lcpbook.namelist()):
return False
license = json.loads(lcpbook.read('META-INF/license.lcpl'))
return False
+# This function decrypts data with the given key
+def dataDecryptLCP(b64data, hex_key):
+ # Decode base64 data
+ try:
+ decoded_data = base64.b64decode(b64data)
+ except Exception as e:
+ raise ValueError(f"Invalid Base64 data: {e}")
+ if len(decoded_data) < 16:
+ raise ValueError("Decoded data is too short to contain a valid IV and ciphertext.")
+
+ # Extract IV and ciphertext
+ iv, cipher = decoded_data[:16], decoded_data[16:]
+
+ # Ensure IV is exactly 16 bytes
+ if len(iv) != 16:
+ raise ValueError(f"Incorrect IV length: {len(iv)} bytes (must be 16 bytes).")
+
+ # Convert hex key to bytes
+ key_bytes = binascii.unhexlify(hex_key)
+
+ # Perform AES-CBC decryption
+ aes = AES.new(key_bytes, AES.MODE_CBC, iv)
+ decrypted = aes.decrypt(cipher)
+
+ # Handle PKCS7 padding removal
+ padding = decrypted[-1] if isinstance(decrypted[-1], int) else ord(decrypted[-1])
+ if padding < 1 or padding > 16:
+ raise ValueError("Invalid padding detected.")
+
+ return decrypted[:-padding]
+
+
+# This function just returns an info string about the license
+# Optional.
+def returnUserInfoStringForLicense(license, user_pass = None):
+ if not "user" in license:
+ return None
+
+ user_name = None
+ user_email = None
+
+ if "email" in license["user"]:
+ user_email = license["user"]["email"]
+ if "name" in license["user"]:
+ user_name = license["user"]["name"]
+
+ # Sometimes these are encrypted
+ if "encrypted" in license["user"] and "email" in license["user"]["encrypted"]:
+ if user_pass is None:
+ user_email = None
+ else:
+ # Decrypt
+ try:
+ user_email_temp = dataDecryptLCP(user_email, user_pass)
+ user_email = str(user_email_temp.decode("utf-8"))
+ except:
+ # Decryption failed for whatever reason
+ user_email = None
+
+
+ if "encrypted" in license["user"] and "name" in license["user"]["encrypted"]:
+ if user_pass is None:
+ user_name = None
+ else:
+ # Decrypt
+ try:
+ user_name_temp = dataDecryptLCP(user_name, user_pass)
+ user_name = str(user_name_temp.decode("utf-8"))
+ except:
+ # Decryption failed for whatever reason
+ user_name = None
+
+ if (user_name is None and user_email is None):
+ return None
+
+ print_str = ""
+
+ if ("id" in license["user"]):
+ print_str += "ID=" + license["user"]["id"] + ", "
+
+ if (user_email is not None):
+ print_str += "Email=" + user_email + ", "
+
+ if (user_name is not None):
+ print_str += "Name=" + user_name + ", "
+
+ # Remove last comma
+ print_str = print_str[:-2]
+ return print_str
+
# Takes a file and a list of passphrases
def decryptLCPbook(inpath, passphrases, parent_object):
-
if not isLCPbook(inpath):
raise LCPError("This is not an LCP-encrypted book")
- print("LCP: LCP DRM removal no longer supported due to a DMCA takedown request.")
- print("LCP: The takedown request can be found here: ")
- print("LCP: https://github.com/github/dmca/blob/master/2022/01/2022-01-04-readium.md ")
- print("LCP: More information can be found in the Github repository: ")
- print("LCP: https://github.com/noDRM/DeDRM_tools/issues/18 ")
+ file = ZipFile(open(inpath, 'rb'))
+
+ license = json.loads(file.read('META-INF/license.lcpl'))
+ print("LCP: Found LCP-encrypted book {0}".format(license["id"]))
+
+ user_info_string1 = returnUserInfoStringForLicense(license, None)
+ if (user_info_string1 is not None):
+ print("LCP: Account information: " + user_info_string1)
+
+ # Check algorithm:
+ if license["encryption"]["profile"] == "http://readium.org/lcp/basic-profile":
+ print("LCP: Book is using lcp/basic-profile encryption.")
+ transform_algo = LCPTransform.secret_transform_basic
+ elif license["encryption"]["profile"] == "http://readium.org/lcp/profile-1.0":
+ print("LCP: Book is using lcp/profile-1.0 encryption")
+ transform_algo = LCPTransform.secret_transform_profile10
+ else:
+ file.close()
+ raise LCPError("Book is using an unknown LCP encryption standard: {0}".format(license["encryption"]["profile"]))
+
+ if (
+ "algorithm" in license["encryption"]["content_key"] and
+ license["encryption"]["content_key"]["algorithm"] != "http://www.w3.org/2001/04/xmlenc#aes256-cbc"
+ ):
+ file.close()
+ raise LCPError("Book is using an unknown LCP encryption algorithm: {0}".format(license["encryption"]["content_key"]["algorithm"]))
+
+ key_check = license["encryption"]["user_key"]["key_check"]
+ encrypted_content_key = license["encryption"]["content_key"]["encrypted_value"]
+
+ # Prepare a list of encryption keys to test:
+ password_hashes = []
+
+ # Some providers hard-code the passphrase in the LCPL file. That doesn't happen often,
+ # but when it does, these files can be decrypted without knowing any passphrase.
+
+ if "value" in license["encryption"]["user_key"]:
+ try:
+ password_hashes.append(binascii.hexlify(base64.decodebytes(license["encryption"]["user_key"]["value"].encode())).decode("ascii"))
+ except AttributeError:
+ # Python 2
+ password_hashes.append(binascii.hexlify(base64.decodestring(license["encryption"]["user_key"]["value"].encode())).decode("ascii"))
+ if "hex_value" in license["encryption"]["user_key"]:
+ password_hashes.append(binascii.hexlify(bytearray.fromhex(license["encryption"]["user_key"]["hex_value"])).decode("ascii"))
+
+ # Hash all the passwords provided by the user:
+ for possible_passphrase in passphrases:
+ password_hashes.append(possible_passphrase)
+ algo = "http://www.w3.org/2001/04/xmlenc#sha256"
+ if "algorithm" in license["encryption"]["user_key"]:
+ algo = license["encryption"]["user_key"]["algorithm"]
+
+ algo, tmp_pw = LCPTransform.userpass_to_hash(possible_passphrase.encode('utf-8'), algo)
+ if tmp_pw is not None:
+ password_hashes.append(tmp_pw)
+
+ # For all the password hashes, check if one of them decrypts the book:
+ correct_password_hash = None
+
+ for possible_hash in password_hashes:
+ transformed_hash = possible_hash
+ print("trying {0}".format(transformed_hash))
+ try:
+ decrypted = None
+ decrypted = dataDecryptLCP(key_check, transformed_hash)
+ except:
+ pass
+
+ if (decrypted is not None and decrypted.decode("ascii", errors="ignore") == license["id"]):
+ # Found correct password hash, hooray!
+ correct_password_hash = transformed_hash
+ break
+
+ transformed_hash = transform_algo(possible_hash)
+ print("trying {0}".format(transformed_hash))
+ try:
+ decrypted = None
+ decrypted = dataDecryptLCP(key_check, transformed_hash)
+ except:
+ pass
+
+ if (decrypted is not None and decrypted.decode("ascii", errors="ignore") == license["id"]):
+ # Found correct password hash, hooray!
+ correct_password_hash = transformed_hash
+ break
+
+
+ # Print an error message if none of the passwords worked
+ if (correct_password_hash is None):
+ print("LCP: Tried {0} passphrases, but none of them could decrypt the book ...".format(len(password_hashes) / 2))
+
+ # Print password hint, if available
+ if ("text_hint" in license["encryption"]["user_key"] and license["encryption"]["user_key"]["text_hint"] != ""):
+ print("LCP: The book distributor has given you the following passphrase hint: \"{0}\"".format(license["encryption"]["user_key"]["text_hint"]))
+
+ print("LCP: Enter the correct passphrase in the DeDRM plugin settings, then try again.")
+
+ # Print password reset instructions, if available
+ for link in license["links"]:
+ if ("rel" in link and link["rel"] == "hint"):
+ print("LCP: You may be able to find or reset your LCP passphrase on the following webpage: {0}".format(link["href"]))
+ break
+
+
+ file.close()
+ raise LCPError("No correct passphrase found")
+
+ print("LCP: Found correct passphrase, decrypting book ...")
+ user_info_string2 = returnUserInfoStringForLicense(license, correct_password_hash)
+ if (user_info_string2 is not None):
+ if (user_info_string1 != user_info_string2):
+ print("LCP: Account information: " + user_info_string2)
+
+
+ # Take the key we found and decrypt the content key:
+ decrypted_content_key = dataDecryptLCP(encrypted_content_key, correct_password_hash)
+
+ if decrypted_content_key is None:
+ raise LCPError("Decrypted content key is None")
+
+ # Begin decrypting
+
+ if 'META-INF/encryption.xml' in file.namelist():
+ encryption = file.read('META-INF/encryption.xml')
+ else:
+ encryption = None
+ decryptor = Decryptor(decrypted_content_key, encryption)
+ kwds = dict(compression=ZIP_DEFLATED, allowZip64=False)
+ for link in license.get("links", []):
+ if link.get("rel") == "publication":
+ content_type = link.get("type")
+ break
+
+ if content_type in ["application/pdf+lcp", "application/pdf"]:
+ # Check how many PDF files there are.
+ # Usually, an LCP-protected PDF/ZIP is only supposed to contain one
+ # PDF file, but if there are multiple, return a ZIP that contains them all.
+
+ pdf_files = []
+ for filename in file.namelist():
+ if filename.endswith(".pdf"):
+ pdf_files.append(filename)
+
+ if len(pdf_files) == 0:
+ file.close()
+ raise LCPError("Error: Book is an LCP-protected PDF, but doesn't contain any PDF files ...")
+
+ elif len(pdf_files) == 1:
+ # One PDF file found - extract and return that.
+ pdfdata = file.read(pdf_files[0])
+ outputname = parent_object.temporary_file(".pdf").name
+ print("LCP: Successfully decrypted, exporting to {0}".format(outputname))
+
+ with open(outputname, 'wb') as f:
+ f.write(decryptor.decrypt(pdf_files[0], pdfdata))
+
+ file.close()
+ return outputname
+
+ else:
+ # Multiple PDFs found
+ outputname = parent_object.temporary_file(".zip").name
+ with closing(ZipFile(open(outputname, 'wb'), 'w', **kwds)) as outfile:
+ for path in pdf_files:
+ data = file.read(path)
+ outfile.writestr(path, decryptor.decrypt(path, data))
+
+ print("LCP: Successfully decrypted a multi-PDF ZIP file, exporting to {0}".format(outputname))
+ file.close()
+ return outputname
+
+ else:
+ # Not a PDF -> EPUB
+
+ if content_type == "application/epub+zip":
+ outputname = parent_object.temporary_file(".epub").name
+ else:
+ outputname = parent_object.temporary_file(".zip").name
+
+ with closing(ZipFile(open(outputname, 'wb'), 'w', **kwds)) as outfile:
+
+ # mimetype must be 1st file. Remove from list and manually add at the beginning
+ namelist = file.namelist()
+ namelist.remove("mimetype")
+ namelist.remove("META-INF/license.lcpl")
+
+ for path in (["mimetype"] + namelist):
+ data = file.read(path)
+ zi = ZipInfo(path)
+
+ if path == "META-INF/encryption.xml":
+ # Check if that's still needed
+ if (decryptor.check_if_remaining()):
+ data = decryptor.get_xml()
+ print("LCP: Adding encryption.xml for the remaining files.")
+ else:
+ continue
+
+ try:
+ oldzi = file.getinfo(path)
+ if path == "mimetype":
+ zi.compress_type = ZIP_STORED
+ else:
+ zi.compress_type = ZIP_DEFLATED
+ zi.date_time = oldzi.date_time
+ zi.comment = oldzi.comment
+ zi.extra = oldzi.extra
+ zi.internal_attr = oldzi.internal_attr
+ zi.external_attr = oldzi.external_attr
+ zi.create_system = oldzi.create_system
+ if any(ord(c) >= 128 for c in path) or any(ord(c) >= 128 for c in zi.comment):
+ # If the file name or the comment contains any non-ASCII char, set the UTF8-flag
+ zi.flag_bits |= 0x800
+ except:
+ pass
- raise LCPError("LCP DRM removal no longer supported")
+ if path == "META-INF/encryption.xml":
+ outfile.writestr(zi, data)
+ else:
+ outfile.writestr(zi, decryptor.decrypt(path, data))
+
+ print("LCP: Successfully decrypted, exporting to {0}".format(outputname))
+ file.close()
+ return outputname
\ No newline at end of file