Implement Aleph bulk upload as an aggregation operation
sunu opened this issue · 7 comments
Whenever a scraper uses balkhash_emit to put entities into a Balkhash dataset, we want to push all those entities on to an Aleph collection after the crawler has finished running.
I wonder if we should also move the aleph_emit
operation into the alephclient
package.
@pudo I see that the aleph_emit method was moved to alephclient: https://github.com/alephdata/memorious/pull/69/files I wonder if there is a nice way to schedule exporting of crawled lists from memorious to aleph datasets using the memorious scheduling system and not relying on alephclient command and some external scheduling mechanism (cron or whatever else)
Hi @thimios, you can define an aggregator function in a crawler's config. The function will run when the crawler is finished running. It can be any python function with the signature function(context, params)
For example, we have written an aggregator function to push ftm entities made by a memorious crawler in bulk into Aleph. Here's the config file: https://github.com/alephdata/opensanctions/blob/6928fd94efee1027935bb40aa701a7bfa731f155/opensanctions/config/au_dfat_sanctions.yml#L17
Here's how the function itself looks like: https://github.com/alephdata/balkhash/blob/master/balkhash/memorious.py#L43
@sunu thanks for the pointers. In order for that function to work I would also need to have alephclient installed on the worker container with:
pip install alephclient
and also set the two env variables:
ALEPHCLIENT_HOST: http://aleph_api_1:5000
ALEPHCLIENT_API_KEY: 047df233f8cb4b96bfd23b25a1bf2xxx
right? Or is there another way to configure the host and api key for that function?
also I do not need a postgres container for opensanctions if I only use crawlers that push to aleph right?
@sunu thanks for the pointers. In order for that function to work I would also need to have alephclient installed on the worker container with:
pip install alephclient
and also set the two env variables:
ALEPHCLIENT_HOST: http://aleph_api_1:5000
ALEPHCLIENT_API_KEY: 047df233f8cb4b96bfd23b25a1bf2xxx
right? Or is there another way to configure the host and api key for that function?
Yes, that should do. Make sure the Aleph host is actually reachable from your crawler worker container. Docker's networking can make that complicated sometimes.
also I do not need a postgres container for opensanctions if I only use crawlers that push to aleph right?
In case of OpenSanctions, we use balkhash to store FtM entity fragments before pushing them to Aleph. Balkhash works with multiple backends; Postgres being one of them. If you don't use postgres, Balkhash will fallback to using leveldb. You would have to change https://github.com/alephdata/opensanctions/blob/6928fd94efee1027935bb40aa701a7bfa731f155/setup.py#L19 to balkhash[leveldb]
to install the appropriate dependencies.
It seems that it is not working. I have put the two containers in the same docker network so that I can do:
INFO:alephclient.cli:[un_sc_sanctions] Bulk load entities: 1000...
INFO:alephclient.cli:[un_sc_sanctions] Bulk load entities: 2000...
/memorious #
on the opensanctions worker container shell.
I have configured all memorious crawlers to use the aleph bulkpush aggregator:
name: us_bis_denied
description: "[OSANC] US BIS Denied Persons List"
schedule: daily
pipeline:
init:
method: seed
params:
url: 'https://www.bis.doc.gov/dpl/dpl.txt'
handle:
pass: fetch
fetch:
method: fetch
handle:
pass: parse
parse:
method: opensanctions.crawlers.us_bis_denied:parse
aggregator:
method: balkhash.memorious:aleph_bulkpush
and I have also tried specifying the foreign id of the aleph dataset:
name: interpol_red_notices
description: "[OSANC] Interpol Red Notices"
schedule: daily
pipeline:
init:
method: seed
params:
url: 'https://www.interpol.int/en/How-we-work/Notices/View-Red-Notices'
handle:
pass: fetch
fetch:
method: fetch
handle:
pass: index
index:
method: opensanctions.crawlers.interpol_red_notices:index
handle:
pass: fetch_notices
fetch_notices:
method: fetch
handle:
pass: parse_noticelist
parse_noticelist:
method: opensanctions.crawlers.interpol_red_notices:parse_noticelist
handle:
pass: fetch_notice
fetch_notice:
method: fetch
handle:
pass: parse_notice
parse_notice:
method: opensanctions.crawlers.interpol_red_notices:parse_notice
aggregator:
method: balkhash.memorious:aleph_bulkpush
params:
foreign_id: 26e399bfc91a4000893985f95b23830e
but the datasets are not getting filled, either by letting the crawlers follow their schedule or by running them manually:
INFO:us_bis_denied.init:[us_bis_denied->init(seed)]: 2cbb539567714df5bbeca2a984443e69
INFO:us_bis_denied.fetch:[us_bis_denied->fetch(fetch)]: 2cbb539567714df5bbeca2a984443e69
INFO:us_bis_denied.fetch:Fetched [200]: 'https://www.bis.doc.gov/dpl/dpl.txt'
INFO:us_bis_denied.parse:[us_bis_denied->parse(opensanctions.crawlers.us_bis_denied:parse)]: 2cbb539567714df5bbeca2a984443e69
/memorious #
I do not get any explanatory logs on the console or the docker logs... Any ideas what I should look into?