Spaces:
Running
Running
| import sys | |
| import os | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import re | |
| import urllib | |
| import time | |
| import shutil | |
| def get_main_url(url): | |
| return "/".join(url.split("/")[:3]) | |
| def save_pdf_from_url(pdf_url, directory, name, headers): | |
| try: | |
| response = requests.get(pdf_url, headers=headers, allow_redirects=True) | |
| response.raise_for_status() | |
| if not response.content.startswith(b'%PDF'): | |
| content_str = response.content.decode('utf-8', errors='ignore') | |
| if 'Preparing to download' in content_str: | |
| pmc_match = re.search(r'PMC\d+', pdf_url) | |
| if pmc_match: | |
| pmc_id = pmc_match.group() | |
| alt_url = f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" | |
| print(f"** Trying alternative URL: {alt_url}") | |
| response = requests.get(alt_url, headers=headers, allow_redirects=True) | |
| response.raise_for_status() | |
| with open(f'{directory}/{name}.pdf', 'wb') as f: | |
| f.write(response.content) | |
| print(f"** Successfully fetched and saved PDF for PMCID {name}. File size: {len(response.content)} bytes") | |
| except requests.RequestException as e: | |
| print(f"** Failed to download PDF from {pdf_url}: {e}") | |
| def fetch(pmcid, finders, name, headers, error_pmids, args): | |
| uri = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmcid.strip()}" | |
| success = False | |
| if os.path.exists(f"{args['out']}/{pmcid}.pdf"): | |
| print(f"** Reprint #{pmcid} already downloaded and in folder; skipping.") | |
| return | |
| try: | |
| req = requests.get(uri, headers=headers) | |
| req.raise_for_status() | |
| soup = BeautifulSoup(req.content, 'lxml') | |
| for finder in finders: | |
| print(f"Trying {finder}") | |
| pdf_url = eval(finder)(req, soup, headers) | |
| if pdf_url: | |
| save_pdf_from_url(pdf_url, args['out'], name, headers) | |
| success = True | |
| break | |
| if not success: | |
| print(f"** Reprint {pmcid} could not be fetched with the current finders.") | |
| error_pmids.write(f"{pmcid}\t{name}\n") | |
| except requests.RequestException as e: | |
| print(f"** Request failed for PMCID {pmcid}: {e}") | |
| error_pmids.write(f"{pmcid}\t{name}\n") | |
| def acs_publications(req, soup, headers): | |
| links = [x for x in soup.find_all('a') if x.get('title') and ('high-res pdf' in x.get('title').lower() or 'low-res pdf' in x.get('title').lower())] | |
| if links: | |
| print("** Using ACS Publications finder...") | |
| return get_main_url(req.url) + links[0].get('href') | |
| return None | |
| def future_medicine(req, soup, headers): | |
| links = soup.find_all('a', attrs={'href': re.compile("/doi/pdf")}) | |
| if links: | |
| print("** Using Future Medicine finder...") | |
| return get_main_url(req.url) + links[0].get('href') | |
| return None | |
| def generic_citation_labelled(req, soup, headers): | |
| links = soup.find_all('meta', attrs={'name': 'citation_pdf_url'}) | |
| if links: | |
| print("** Using Generic Citation Labelled finder...") | |
| return links[0].get('content') | |
| return None | |
| def nejm(req, soup, headers): | |
| links = [x for x in soup.find_all('a') if x.get('data-download-type') and x.get('data-download-type').lower() == 'article pdf'] | |
| if links: | |
| print("** Using NEJM finder...") | |
| return get_main_url(req.url) + links[0].get('href') | |
| return None | |
| def pubmed_central_v2(req, soup, headers): | |
| links = soup.find_all('a', attrs={'href': re.compile('/pmc/articles')}) | |
| if links: | |
| print("** Using PubMed Central V2 finder...") | |
| return f"https://www.ncbi.nlm.nih.gov{links[0].get('href')}" | |
| return None | |
| def science_direct(req, soup, headers): | |
| try: | |
| new_uri = urllib.parse.unquote(soup.find_all('input')[0].get('value')) | |
| req = requests.get(new_uri, allow_redirects=True, headers=headers) | |
| req.raise_for_status() | |
| soup = BeautifulSoup(req.content, 'lxml') | |
| links = soup.find_all('meta', attrs={'name': 'citation_pdf_url'}) | |
| if links: | |
| print("** Using Science Direct finder...") | |
| return links[0].get('content') | |
| except Exception as e: | |
| print(f"** Science Direct finder error: {e}") | |
| return None | |
| def uchicago_press(req, soup, headers): | |
| links = [x for x in soup.find_all('a') if x.get('href') and 'pdf' in x.get('href') and '.edu/doi/' in x.get('href')] | |
| if links: | |
| print("** Using UChicago Press finder...") | |
| return get_main_url(req.url) + links[0].get('href') | |
| return None | |
| def europe_pmc_service(req, soup, headers): | |
| pmc_match = re.search(r'PMC\d+', req.url) | |
| if pmc_match: | |
| pmc_id = pmc_match.group() | |
| print(f"** Using Europe PMC Service finder for {pmc_id}...") | |
| return f"https://europepmc.org/backend/ptpmcrender.fcgi?accid={pmc_id}&blobtype=pdf" | |
| return None | |
| def main(pcds): | |
| args = { | |
| 'pmcids': f'{pcds}', # 替换为你要下载的 PMCIDs | |
| 'pmf': '%#$', # 如果使用文件则设置路径 | |
| 'out': 'fetched_pdfs', | |
| 'errors': 'unfetched_pmcids.tsv', | |
| 'maxRetries': 3, | |
| 'batch': 10, | |
| 'delay': 5 | |
| } | |
| if args['pmcids'] == '%#$' and args['pmf'] == '%#$': | |
| print("Error: 必须提供 pmcids 或 pmf") | |
| return | |
| if args['pmcids'] != '%#$' and args['pmf'] != '%#$': | |
| print("Warning: 同时提供了 pmcids 和 pmf,忽略 pmf") | |
| args['pmf'] = '%#$' | |
| if not os.path.exists(args['out']): | |
| print(f"创建输出目录: {args['out']}") | |
| os.mkdir(args['out']) | |
| headers = requests.utils.default_headers() | |
| headers['User-Agent'] = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36' | |
| if args['pmcids'] != '%#$': | |
| pmcids = args['pmcids'].split(",") | |
| names = pmcids | |
| else: | |
| pmcids = [line.strip().split() for line in open(args['pmf'])] | |
| if len(pmcids[0]) == 1: | |
| pmcids = [x[0] for x in pmcids] | |
| names = pmcids | |
| else: | |
| names = [x[1] for x in pmcids] | |
| pmcids = [x[0] for x in pmcids] | |
| finders = [ | |
| 'europe_pmc_service', | |
| 'generic_citation_labelled', | |
| 'pubmed_central_v2', | |
| 'acs_publications', | |
| 'uchicago_press', | |
| 'nejm', | |
| 'future_medicine', | |
| 'science_direct' | |
| ] | |
| batch_count = 0 | |
| with open(args['errors'], 'w+') as error_pmids: | |
| for pmcid, name in zip(pmcids, names): | |
| print(f"Trying to fetch PMCID {pmcid.strip()}") | |
| retries_so_far = 0 | |
| while retries_so_far < args['maxRetries']: | |
| try: | |
| fetch(pmcid, finders, name, headers, error_pmids, args) | |
| retries_so_far = args['maxRetries'] | |
| except requests.ConnectionError as e: | |
| if '104' in str(e): | |
| retries_so_far += 1 | |
| if retries_so_far < args['maxRetries']: | |
| print(f"** Retry {retries_so_far}/{args['maxRetries']} for {pmcid} due to error {e}") | |
| else: | |
| print(f"** Max retries reached for {pmcid}") | |
| error_pmids.write(f"{pmcid}\t{name}\n") | |
| else: | |
| print(f"** Connection error for {pmcid}: {e}") | |
| retries_so_far = args['maxRetries'] | |
| error_pmids.write(f"{pmcid}\t{name}\n") | |
| except Exception as e: | |
| print(f"** General error for {pmcid}: {e}") | |
| retries_so_far = args['maxRetries'] | |
| error_pmids.write(f"{pmcid}\t{name}\n") | |
| batch_count += 1 | |
| if batch_count % args['batch'] == 0: | |
| print(f"** Batch limit reached. Sleeping for {args['delay']} seconds...") | |
| time.sleep(args['delay']) | |
| # 下载完成后,打包输出目录 | |
| zip_path = args['out'] + ".zip" | |
| shutil.make_archive(args['out'], 'zip', args['out']) | |
| return zip_path # <== 返回 zip 文件路径 | |