tstd / ppp.py
Stereo0001's picture
Update ppp.py
4749bad verified
import sys
import os
import requests
from bs4 import BeautifulSoup
import re
import urllib
import time
import shutil
def get_main_url(url):
return "/".join(url.split("/")[:3])
def save_pdf_from_url(pdf_url, directory, name, headers):
try:
response = requests.get(pdf_url, headers=headers, allow_redirects=True)
response.raise_for_status()
if not response.content.startswith(b'%PDF'):
content_str = response.content.decode('utf-8', errors='ignore')
if 'Preparing to download' in content_str:
pmc_match = re.search(r'PMC\d+', pdf_url)
if pmc_match:
pmc_id = pmc_match.group()
alt_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
print(f"** Trying alternative URL: {alt_url}")
response = requests.get(alt_url, headers=headers, allow_redirects=True)
response.raise_for_status()
with open(f'{directory}/{name}.pdf', 'wb') as f:
f.write(response.content)
print(f"** Successfully fetched and saved PDF for PMCID {name}. File size: {len(response.content)} bytes")
except requests.RequestException as e:
print(f"** Failed to download PDF from {pdf_url}: {e}")
def fetch(pmcid, finders, name, headers, error_pmids, args):
uri = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmcid.strip()}"
success = False
if os.path.exists(f"{args['out']}/{pmcid}.pdf"):
print(f"** Reprint #{pmcid} already downloaded and in folder; skipping.")
return
try:
req = requests.get(uri, headers=headers)
req.raise_for_status()
soup = BeautifulSoup(req.content, 'lxml')
for finder in finders:
print(f"Trying {finder}")
pdf_url = eval(finder)(req, soup, headers)
if pdf_url:
save_pdf_from_url(pdf_url, args['out'], name, headers)
success = True
break
if not success:
print(f"** Reprint {pmcid} could not be fetched with the current finders.")
error_pmids.write(f"{pmcid}\t{name}\n")
except requests.RequestException as e:
print(f"** Request failed for PMCID {pmcid}: {e}")
error_pmids.write(f"{pmcid}\t{name}\n")
def acs_publications(req, soup, headers):
links = [x for x in soup.find_all('a') if x.get('title') and ('high-res pdf' in x.get('title').lower() or 'low-res pdf' in x.get('title').lower())]
if links:
print("** Using ACS Publications finder...")
return get_main_url(req.url) + links[0].get('href')
return None
def future_medicine(req, soup, headers):
links = soup.find_all('a', attrs={'href': re.compile("/doi/pdf")})
if links:
print("** Using Future Medicine finder...")
return get_main_url(req.url) + links[0].get('href')
return None
def generic_citation_labelled(req, soup, headers):
links = soup.find_all('meta', attrs={'name': 'citation_pdf_url'})
if links:
print("** Using Generic Citation Labelled finder...")
return links[0].get('content')
return None
def nejm(req, soup, headers):
links = [x for x in soup.find_all('a') if x.get('data-download-type') and x.get('data-download-type').lower() == 'article pdf']
if links:
print("** Using NEJM finder...")
return get_main_url(req.url) + links[0].get('href')
return None
def pubmed_central_v2(req, soup, headers):
links = soup.find_all('a', attrs={'href': re.compile('/pmc/articles')})
if links:
print("** Using PubMed Central V2 finder...")
return f"https://www.ncbi.nlm.nih.gov{links[0].get('href')}"
return None
def science_direct(req, soup, headers):
try:
new_uri = urllib.parse.unquote(soup.find_all('input')[0].get('value'))
req = requests.get(new_uri, allow_redirects=True, headers=headers)
req.raise_for_status()
soup = BeautifulSoup(req.content, 'lxml')
links = soup.find_all('meta', attrs={'name': 'citation_pdf_url'})
if links:
print("** Using Science Direct finder...")
return links[0].get('content')
except Exception as e:
print(f"** Science Direct finder error: {e}")
return None
def uchicago_press(req, soup, headers):
links = [x for x in soup.find_all('a') if x.get('href') and 'pdf' in x.get('href') and '.edu/doi/' in x.get('href')]
if links:
print("** Using UChicago Press finder...")
return get_main_url(req.url) + links[0].get('href')
return None
def europe_pmc_service(req, soup, headers):
pmc_match = re.search(r'PMC\d+', req.url)
if pmc_match:
pmc_id = pmc_match.group()
print(f"** Using Europe PMC Service finder for {pmc_id}...")
return f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf"
return None
def main(pcds):
args = {
'pmcids': f'{pcds}', # 替换为你要下载的 PMCIDs
'pmf': '%#$', # 如果使用文件则设置路径
'out': 'fetched_pdfs',
'errors': 'unfetched_pmcids.tsv',
'maxRetries': 3,
'batch': 10,
'delay': 5
}
if args['pmcids'] == '%#$' and args['pmf'] == '%#$':
print("Error: 必须提供 pmcids 或 pmf")
return
if args['pmcids'] != '%#$' and args['pmf'] != '%#$':
print("Warning: 同时提供了 pmcids 和 pmf,忽略 pmf")
args['pmf'] = '%#$'
if not os.path.exists(args['out']):
print(f"创建输出目录: {args['out']}")
os.mkdir(args['out'])
headers = requests.utils.default_headers()
headers['User-Agent'] = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36'
if args['pmcids'] != '%#$':
pmcids = args['pmcids'].split(",")
names = pmcids
else:
pmcids = [line.strip().split() for line in open(args['pmf'])]
if len(pmcids[0]) == 1:
pmcids = [x[0] for x in pmcids]
names = pmcids
else:
names = [x[1] for x in pmcids]
pmcids = [x[0] for x in pmcids]
finders = [
'europe_pmc_service',
'generic_citation_labelled',
'pubmed_central_v2',
'acs_publications',
'uchicago_press',
'nejm',
'future_medicine',
'science_direct'
]
batch_count = 0
with open(args['errors'], 'w+') as error_pmids:
for pmcid, name in zip(pmcids, names):
print(f"Trying to fetch PMCID {pmcid.strip()}")
retries_so_far = 0
while retries_so_far < args['maxRetries']:
try:
fetch(pmcid, finders, name, headers, error_pmids, args)
retries_so_far = args['maxRetries']
except requests.ConnectionError as e:
if '104' in str(e):
retries_so_far += 1
if retries_so_far < args['maxRetries']:
print(f"** Retry {retries_so_far}/{args['maxRetries']} for {pmcid} due to error {e}")
else:
print(f"** Max retries reached for {pmcid}")
error_pmids.write(f"{pmcid}\t{name}\n")
else:
print(f"** Connection error for {pmcid}: {e}")
retries_so_far = args['maxRetries']
error_pmids.write(f"{pmcid}\t{name}\n")
except Exception as e:
print(f"** General error for {pmcid}: {e}")
retries_so_far = args['maxRetries']
error_pmids.write(f"{pmcid}\t{name}\n")
batch_count += 1
if batch_count % args['batch'] == 0:
print(f"** Batch limit reached. Sleeping for {args['delay']} seconds...")
time.sleep(args['delay'])
# 下载完成后,打包输出目录
zip_path = args['out'] + ".zip"
shutil.make_archive(args['out'], 'zip', args['out'])
return zip_path # <== 返回 zip 文件路径