はじめに
Pythonでスクレイピングを実行したくて、ChatGPTに頼んでコードを書いてもらったということを以下の記事に書きました。
また、これまでRでスクレイピングを実行する方法についていくつか記事を書いてきましたが、1つずつファイルをダウンロード、ZIPファイルを解凍、みたいな流れで実行していたところ、今回は複数のファイルを並列してダウンロードできる方法を学ぶことができました。
そのコード、手順についてまとめていきたいと思います。
環境
Windows 11
Python 3.12.3
ダウンロードするページ
Rのコードを紹介したページと同じで、国勢調査をもとに公開しているメッシュごとの人口データをダウンロードします。URLは以下です。
コード
# ====== Standard library ======
import re
import shutil
import threading
import zipfile
from pathlib import Path
from typing import Optional, Tuple
from urllib.parse import urljoin
# ====== Third-party ======
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from tqdm import tqdm
from urllib3.util.retry import Retry
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
# ==============================
# Settings
# ==============================
# 保存先ディレクトリのパス(存在しない場合は作成)
= Path(r"data\250813_scraping_python")
OUTPUT_DIR =True, exist_ok=True)
OUTPUT_DIR.mkdir(parents
# ベースURL(相対パスを補完するために使用)
= "https://www.e-stat.go.jp"
BASE_URL
# 並列処理設定
= 6 # 同時DL数(回線速度・先方負荷に応じて調整)
MAX_WORKERS = 60 # タイムアウト時間(秒)
REQUEST_TIMEOUT
# HTTPリクエスト時のUser-Agent(アクセス元をブラウザっぽく偽装)
= (
USER_AGENT "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126 Safari/537.36"
)
# ==============================
# Selenium setup
# ==============================
def create_chrome_driver() -> webdriver.Chrome:
"""Headless Chromeのドライバーを作成"""
= Options()
options
# ヘッドレスモード(ブラウザ画面を表示せずにバックグラウンドで動作)
"--headless=new")
options.add_argument(# GPU無効化(描画関連の不要な処理を省く)
"--disable-gpu")
options.add_argument(# ウィンドウサイズ指定(幅×高さ)
"--window-size=1280,1800")
options.add_argument(# 自動化検知を回避(アクセスブロック対策)
"--disable-blink-features=AutomationControlled")
options.add_argument(# User-Agentを上書き(アクセス元をブラウザっぽく偽装)
f"user-agent={USER_AGENT}")
options.add_argument(
# Chromeドライバーを生成
= webdriver.Chrome(options=options)
driver # ページ読み込みのタイムアウト(秒)
60)
driver.set_page_load_timeout(# JavaScript実行のタイムアウト(秒)
60)
driver.set_script_timeout(
return driver
# ==============================
# Dynamic page loader
# ==============================
def fetch_rendered_html(
str,
url:
driver: webdriver.Chrome,str] = None,
wait_for_css: Optional[int = 20,
timeout: -> str:
) """
JavaScript実行後のHTMLを取得する関数
必要に応じて、特定のCSSセレクタがDOMに出現するまで待機
"""
# 指定したURLにアクセス
driver.get(url)
# ページの読み込み完了まで待機
WebDriverWait(driver, timeout).until(lambda d: d.execute_script("return document.readyState") == "complete"
)
# もしCSSセレクタが指定されていれば、その要素が表示されるまで待機
# (JSによる非同期描画の完了を保証するため)
if wait_for_css:
WebDriverWait(driver, timeout).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, wait_for_css))
)
# 最終的なページのHTMLソースを返す
return driver.page_source
# ==============================
# Link extraction
# ==============================
def find_csv_links(html: str, pattern: str = r"data\?statsId=T001141") -> list[str]:
"""
HTMLから指定パターンに合致するリンク(href属性)を抽出する関数。
Parameters
----------
html : str
取得済みのHTML文字列
pattern : str
hrefがこのパターンにマッチするリンクだけを抽出
デフォルトは 'data?statsId=T001141' を含むリンク。
Returns
-------
list[str]
マッチしたhrefのリスト。
"""
# HTML文字列をBeautifulSoupでパース
= BeautifulSoup(html, "html.parser")
soup
# <a>タグの中でhref属性を持つものをすべて取得し、
# さらにhrefが正規表現patternに一致するものだけを抽出
return [
"href"]
a[for a in soup.find_all("a", href=True)
if re.search(pattern, a["href"])
]
# ==============================
# Thread-local requests.Session
# ==============================
# スレッドごとに独立したデータを持てる入れ物を作成
= threading.local()
_thread_local
def get_session() -> requests.Session:
"""
スレッドローカルに保持されるHTTPセッションを取得する関数。
接続プーリング、リトライ設定、統一User-Agentを適用。
- スレッドローカル: 各スレッドで独立したセッションを保持するための仕組み。
- 接続プーリング: 同じホストへの接続を再利用して効率化。
- リトライ設定: 一時的なエラー(429, 500, 502, 503, 504)に対して最大3回まで再試行。
- User-Agent統一: リクエストの送信元情報を統一してサーバー側の認識を安定化。
"""
if getattr(_thread_local, "session", None) is None:
# 新規セッション作成
= requests.Session()
s "User-Agent": USER_AGENT})
s.headers.update({
# リトライ戦略の設定
= Retry(
retries =3, # 最大リトライ回数
total=0.8, # リトライ間隔の指数的増加係数
backoff_factor=(429, 500, 502, 503, 504), # 再試行対象のHTTPステータスコード
status_forcelist=("GET",), # リトライ対象HTTPメソッド
allowed_methods
)
# 接続アダプタの設定(プーリングとリトライ)
= HTTPAdapter(
adapter =MAX_WORKERS, # 同時接続プール数
pool_connections=MAX_WORKERS, # プールの最大接続数
pool_maxsize=retries, # 上記リトライ戦略を適用
max_retries
)"http://", adapter)
s.mount("https://", adapter)
s.mount(
# スレッドローカルに保存
= s
_thread_local.session
return _thread_local.session
# ==============================
# ZIP download & extract (worker)
# ==============================
def fetch_zip_and_extract_txt(task: Tuple[str, str]) -> tuple[str, bool, str]:
"""
指定URLからZIPをダウンロードし、中の最初の'.txt'ファイルを
'pop_mesh{code}.txt' という名前で保存する関数。
戻り値:
(url, success, message)
url : ダウンロード元のURL
success : 成功した場合はTrue、失敗はFalse
message : 実行結果の簡易説明
"""
= task
full_url, code # 一時保存用のZIPファイルパス
= OUTPUT_DIR / f"pop_mesh{code}.zip"
zip_path # 最終的に保存するテキストファイルパス
= OUTPUT_DIR / f"pop_mesh{code}.txt"
txt_path
# すでにtxtが存在する場合はスキップ
if txt_path.exists():
return full_url, True, "skipped (exists)"
try:
# セッションを取得(接続の再利用などを行う)
= get_session()
session
# ZIPをダウンロード(ストリーミングで少しずつ書き込み)
with session.get(full_url, stream=True, timeout=REQUEST_TIMEOUT) as resp:
resp.raise_for_status()with open(zip_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# ZIPファイルでなければ削除して終了
if not zipfile.is_zipfile(zip_path):
=True)
zip_path.unlink(missing_okreturn full_url, False, "not a zip file"
= False
found_txt with zipfile.ZipFile(zip_path, "r") as zf:
for info in zf.infolist():
if info.is_dir(): # ディレクトリはスキップ
continue
if info.filename.lower().endswith(".txt"):
# 直接上書きせず一時ファイルに保存 → 後でリネーム(競合回避)
= txt_path.with_suffix(".txt.part")
tmp_path with zf.open(info) as src, open(tmp_path, "wb") as dst:
shutil.copyfileobj(src, dst)
tmp_path.replace(txt_path)= True
found_txt break
# ZIPは不要になったので削除
=True)
zip_path.unlink(missing_ok
# TXTが見つからなかった場合
if not found_txt:
return full_url, False, "no .txt in zip"
return full_url, True, "ok"
except Exception as e:
# エラー時はZIPを削除
try:
=True)
zip_path.unlink(missing_okexcept Exception:
pass
return full_url, False, f"error: {e}"
# ==============================
# Main
# ==============================
def main() -> None:
"""一覧ページを巡回→リンク収集→並列ダウンロード。"""
= create_chrome_driver() # Chromeドライバー生成
driver try:
list[tuple[str, str]] = [] # ダウンロード対象の (URL, code) タプルを格納
tasks:
# ページ番号 1〜8 を巡回してリンクを収集
for i in tqdm(range(1, 9), desc="ページ巡回"):
# ページごとのURL生成
= (
url f"https://www.e-stat.go.jp/gis/statmap-search?page={i}"
"&type=1&toukeiCode=00200521&toukeiYear=2020&aggregateUnit=H"
"&serveyId=H002005112020&statsId=T001141&datum=2011"
)# JavaScriptレンダリング後のHTMLを取得
= fetch_rendered_html(
html
url,
driver,="a[href*='data?statsId=T001141']", # このCSSが出現するまで待機
wait_for_css=25,
timeout
)# CSVリンク抽出
= find_csv_links(html)
csv_links # 相対パスを絶対URLへ変換
= [urljoin(BASE_URL, link) for link in csv_links]
full_urls
# URLとcodeをセットで保存
for full_url, csv_link in zip(full_urls, csv_links):
= re.search(r"code=([0-9]+)", csv_link)
m = m.group(1) if m else "unknown"
code
tasks.append((full_url, code))
# 成功・失敗・スキップ件数カウンタ
= 0
successes = 0
failures = 0
skipped
# 並列ダウンロード処理
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
= [ex.submit(fetch_zip_and_extract_txt, t) for t in tasks]
futures for fut in tqdm(as_completed(futures), total=len(futures), desc="ダウンロード"):
= fut.result()
url, ok, msg if ok and msg.startswith("ok"):
+= 1
successes elif ok and "skipped" in msg:
+= 1
skipped else:
+= 1
failures
# 処理結果を表示
print(f"\nDone. ok={successes}, skipped={skipped}, failed={failures}")
finally:
# ブラウザを閉じる
driver.quit()
if __name__ == "__main__":
main()
ざっくり解説
Setting, Selenium setupなど、いくつかのブロックに分かれているので、各ブロックでどのようなことをしているのか、ざっくり確認していきます。
Settings
スクレイピングに必要な設定をしています。URLは場合に合わせて変える必要があります。
MAX_WORKERS
の部分でいくつ並列でダウンロードするかを設定しているのが今回のミソだと思います。6つ平行することで高速化を図っています。
USER_AGENT
では、どのブラウザ、環境からアクセスしているのかを設定するもので、デフォルトだと自動化ツールからのアクセスと判断され、アクセスが拒否される可能性があるので、それを防ぐために設定をしています。
Selenium setup
細かい設定はコメントで書いている通りではあるのですが、ブラウザを自動操作できる環境を整えている部分です。
ブラウザをバックグラウンドで動かしたり、無駄な描画はなくしたりと、効率的な設定をここで指定しています。
Dynamic page loader
ここがRでいうところのread_html_live()
に当たる部分です。
指定されたURLにアクセスして、HTMLソースを返すといった作業をしています。
コメントを読んでいただくと、JavaScriptにも対応しているのがお分かりになるかと思います。
Link extraction
これはRでいうところのhtml_attrs()
やhtml_elements()
に該当するようなことをしています。
a
タグを探してきて、その中に含まれるhref
を引っ張ってくるということをしています。href
にはダウンロードのためのリンクが含まれています。
a
タグの探し方などについては、こちらで簡単に説明しています。
Thread-local requests.Session
ここでは、各スレッドごとに独立したHTTPセッションを用意しています。
これにより、並列ダウンロード時に接続が混線せず安全に動作し、さらにセッションの接続再利用(コネクションプール)によって通信が効率化されます。
またリトライ設定やUser-Agentの統一もここでまとめて適用しているので、安定性と速度の両立ができます。
ZIP download & extract (worker)
ここではZIPファイルのダウンロードと解凍の設定をしています。
今回はダウンロードするものの中身はテキストファイルなので.txt
として解凍するように設定していますが、ここはダウンロードするものに合わせて変更する必要があります。
変なものをダウンロードしてしまった場合の対処や、既にファイルが存在する場合の対処もしているので、ミスにもある程度対応できるようになっています。エラーメッセージも表示されるようにしているので、何のエラーが起きたのかはわかるようになっています。
Main
その名の通りメインのコードです。
今回はページ数が1~8まであるとわかっていますので、range(1, 9)
と書かれています。Rと違うので戸惑ったのですが、Pythonでは開始値は含み、終了値は含まないようです。なので1と9が記されています。
流れとしては、ページを巡回してダウンロード先のURLを保存し、巡回し終わったら並列でそれらをダウンロードしていきます。ここが並列になっているので、ダウンロードの速度が向上しているというわけです。
ダウンロード後にはダウンロードに成功したもの、既に存在してスキップしたもの、失敗したものの件数を出力するようにしています。
最後にはdriver.quit()
でブラウザを終了させ、プロセスやメモリが残らないようになっています。
最後のif
文
1番最後の行は「このファイルを直接実行したときだけmain()
を動かす」という役割を持っています。
Pythonでは同じファイルをスクリプトとして実行することも、ライブラリとして他のプログラムから読み込むこともできますが、この仕組みによって「直接実行した場合だけスクレイピング処理を走らせ、importされた場合には余計な処理をしない」ようにしています。
おわりに
今回はChatGPTに手伝ってもらいながら、RでのスクレイピングコードをPythonに書き換え、簡単にまとめました。
自分もPythonは勉強を始めたばかりではあるので認識の誤り等あるかもしれませんが、コメントからお寄せいただければ幸いです。
個人的には業務の効率化を図るうえでPythonでのスクレイピングを活用していきたいと思っているので、引き続きアップデートなどあればまた記事にしていきたいと思います。