106 lines
2.8 KiB
Python
106 lines
2.8 KiB
Python
#!/usr/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
|
"""A script to process items from a redis queue."""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import pprint
|
|
import sys
|
|
import time
|
|
|
|
from scrapy_redis import get_redis
|
|
|
|
logger = logging.getLogger("process_items")
|
|
|
|
|
|
def process_items(r, keys, timeout, limit=0, log_every=1000, wait=0.1):
|
|
"""Process items from a redis queue.
|
|
|
|
Parameters
|
|
----------
|
|
r : Redis
|
|
Redis connection instance.
|
|
keys : list
|
|
List of keys to read the items from.
|
|
timeout: int
|
|
Read timeout.
|
|
|
|
"""
|
|
limit = limit or float("inf")
|
|
processed = 0
|
|
while processed < limit:
|
|
# Change ``blpop`` to ``brpop`` to process as LIFO.
|
|
ret = r.blpop(keys, timeout)
|
|
# If data is found before the timeout then we consider we are done.
|
|
if ret is None:
|
|
time.sleep(wait)
|
|
continue
|
|
|
|
source, data = ret
|
|
try:
|
|
item = json.loads(data)
|
|
except Exception:
|
|
logger.exception("Failed to load item:\n%r", pprint.pformat(data))
|
|
continue
|
|
|
|
try:
|
|
name = item.get("name") or item.get("title")
|
|
url = item.get("url") or item.get("link")
|
|
logger.debug("[%s] Processing item: %s <%s>", source, name, url)
|
|
except KeyError:
|
|
logger.exception(
|
|
"[%s] Failed to process item:\n%r", source, pprint.pformat(item)
|
|
)
|
|
continue
|
|
|
|
processed += 1
|
|
if processed % log_every == 0:
|
|
logger.info("Processed %s items", processed)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("key", help="Redis key where items are stored")
|
|
parser.add_argument("--host")
|
|
parser.add_argument("--port")
|
|
parser.add_argument("--timeout", type=int, default=5)
|
|
parser.add_argument("--limit", type=int, default=0)
|
|
parser.add_argument("--progress-every", type=int, default=100)
|
|
parser.add_argument("-v", "--verbose", action="store_true")
|
|
|
|
args = parser.parse_args()
|
|
|
|
params = {}
|
|
if args.host:
|
|
params["host"] = args.host
|
|
if args.port:
|
|
params["port"] = args.port
|
|
|
|
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
|
|
|
|
r = get_redis(**params)
|
|
host = r.connection_pool.get_connection("info").host
|
|
logger.info("Waiting for items in '%s' (server: %s)", args.key, host)
|
|
kwargs = {
|
|
"keys": [args.key],
|
|
"timeout": args.timeout,
|
|
"limit": args.limit,
|
|
"log_every": args.progress_every,
|
|
}
|
|
try:
|
|
process_items(r, **kwargs)
|
|
retcode = 0 # ok
|
|
except KeyboardInterrupt:
|
|
retcode = 0 # ok
|
|
except Exception:
|
|
logger.exception("Unhandled exception")
|
|
retcode = 2
|
|
|
|
return retcode
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|