blocks-gcs-proxy is a proxy for MAGELLAN BLOCKS concurrent batch board
.
Download the file from https://github.com/groovenauts/blocks-gcs-proxy/releases and put it somewhere on PATH.
Create config.json
like this:
{
"job": {
"subscription": "projects/proj-dummy-999/subscriptions/pipeline01-job-subscription"
},
"progress": {
"topic": "projects/proj-dummy-999/topics/pipeline01-progress-topic"
}
}
Run blocks-gcs-proxy
with command
blocks-gcs-proxy COMMAND ARGS...
blocks-gcs-proxy
calls COMMAND
with ARGS
for each message from Pubsub subscription
specified by job.subscription
in config.json
.
Key | Type | Required | Default | Description |
---|---|---|---|---|
job | map | False | ||
job.error_response | string | False | ack |
Response type on error. It must be one of {ack, nack, none} |
job.interval_on_error | int | False | 0 | The interval time in second to return response on error |
job.pull_interval | int | False | 10 | The interval time in second to pull when it gets no job message. |
job.subscription | string | False | projects/{{ .GCP_PROJECT }}/subscriptions/{{ .PIPELINE }}-job-subscription |
The subscription name to pull job messages |
job.sustainer | map | False | ||
job.sustainer.delay | int | False | See Sustainer | The new deadline in second to extend deadline to ack |
job.sustainer.disabled | bool | False | See Sustainer | Disable sustainer if it's true |
job.sustainer.interval | int | False | See Sustainer | The interval in second to send the message which extends deadline to ack |
job_check | map | False | ||
job_check.method | string | True | "none" | Method to check job before running. You can set one of none , buntdb or gcslock |
job_check.database | string | False | The database name to store job execution data. The usage depends on method |
|
job_check.bucket | string | False | The bucket name to store job execution data. The usage depends on method |
|
job_check.timeout | string | False | The timeout expression like '1h10m10s'. The usage depends on method |
|
progress | map | False | ||
progress.attributes | map[string]string | False | {} | Static attributes of progress notification message |
progress.level | string | False | info |
Log level to publish job progress. You can set one of debug , info , warn , error , fatal and panic . |
progress.topic | string | False | projects/{{ .GCP_PROJECT }}/topics/{{ .PIPELINE }}-progress-topic |
The topic name to publish job progress messages |
log | map | False | ||
log.command_severity | string | False | info |
The Log severity of command outputs. You can set one of debug , info , warn , error , fatal and panic . |
log.level | string | False | info |
Log level of processing of blocks-gcs-proxy . You can set one of debug , info , warn , error , fatal and panic . |
log.stackdriver | map | False | ||
log.stackdriver.error_reporting_service | string | False | The service name of ServiceContext | |
log.stackdriver.labels | map[string]string | True | The labels of Monitored resource | |
log.stackdriver.log_name | string | True | The resource name of the log that will receive the log entries | |
log.stackdriver.project_id | string | True | GCP Project ID | |
log.stackdriver.type | string | True | The type of Monitored resource | |
command | map | False | ||
command.dryrun | bool | False | false |
Don't run the command if this is true. |
command.options | map[key][]string | False | Define if you have to run one of multiple command. See Multiple command options for more detail. | |
download | map | False | ||
download.allow_irregular_url | bool | False | False | Allow not strict URL to download |
download.worker | map | False | ||
download.worker.max_tries | int | False | 0 | The number of tries to download. |
download.worker.workers | int | False | 1 | The number of thread to download. |
upload | map | False | ||
upload.content_type_by_ext | bool | False | Set content type by file extension when uploading to GCS | |
upload.worker | map | False | ||
upload.worker.max_tries | int | False | 0 | The number of tries to upload. |
upload.worker.workers | int | False | 1 | The number of thread to upload. |
If you have commands data in your config.json like the following:
{
"commands": {
"options": {
"key1": ["cmd1", "%{download_files}"],
"key2": ["cmd2", "%{download_files.bar}", "%{uploads_dir}", "%{download_files.baz}"]
}
}
}
And when you run the command by
$ blocks-gcs-proxy %{attrs.foo}
you can choose which command is executed by message attribute named foo
at runtime.
message attributes | command magellan-gcs-proxy calls |
---|---|
{"foo": "key1"} |
cmd1 %{download_files} |
{"foo": "key2"} |
cmd2 %{download_files.bar} %{uploads_dir} %{download_files.baz} |
If the attribute value is not defined in commands keys, the message is ignored with error message.
When your command takes longer time than AckDeadline
of the pipeline job subscription,
Sustainer sends requests to the subscription to expand the deadline.
If you don't set job.sustainer
in your config.json
, blocks-gcs-proxy
sets them
from the subscription's AckDeadline
.
Key | Default |
---|---|
job.sustainer.delay | Subscription's AckDeadline |
job.sustainer.interval | Subscription's AckDeadline * 0.8 |
Check the config.json
is valid.
You can give other file with --config
or -c
option.
$ ./blocks-gcs-proxy check -c config2.json
Error to load config.json.bak cause of invalid character '}' looking for beginning of object key string
NAME:
blocks-gcs-proxy download - Download the files from GCS to downloads directory
USAGE:
blocks-gcs-proxy download [command options] [arguments...]
OPTIONS:
--downloads_dir value, -d value Path to the directory which has bucket_name/path/to/file
--downloaders value, -n value Number of downloaders (default: 6)
$ ./blocks-gcs-proxy download --help
$ ./blocks-gcs-proxy download -d tmp/downloads -n 5 gs://bucket1/path/to/file1 gs://bucket1/path/to/file2 gs://bucket1/path/to/file3
$ ./blocks-gcs-proxy upload --help
NAME:
blocks-gcs-proxy upload - Upload the files under uploads directory
USAGE:
blocks-gcs-proxy upload [command options] [arguments...]
OPTIONS:
--uploads_dir value, -d value Path to the directory which has bucket_name/path/to/file
--uploaders value, -n value Number of uploaders (default: 6)
$ ./blocks-gcs-proxy upload -d tmp/uploads -n 5