This library is compatible with Go 1.11+
Please refer to CHANGELOG.md
if you encounter breaking changes.
When dealing with various cloud providers, it is a frequent use case to move seamlessly data from one cloud resource to another. This resource could be any cloud storage or even a cloud message bus. During mirroring a data may be transformed or partitioned all driven by declarative mirroring rules.
Google Storage to S3 Mirror
To mirror data from google storage that matches /data/ prefix and '.csv.gz' suffix to s3://destBucket/data the following rule can be used
@gs://$configBucket/StorageMirror/Rules/rule.yaml
Source:
Prefix: "/data/"
Suffix: ".csv.gz"
Dest:
URL: s3://destBucket/data
Credentials:
URL: gs://sourceBucket/secret/s3-cred.json.enc
Key: projects/my_project/locations/us-central1/keyRings/my_ring/cryptoKeys/my_key
Codec: gzip
Info:
Workflow: My workflow name here
Description: my description
ProjectURL: JIRA/WIKi or any link referece
LeadEngineer: my@dot.com
To mirror data from S3 that matches /myprefix/ prefix and '.csv.gz' suffix to gs://destBucket/data splitting source file into maximum 8 MB files in destination you can use the following
@s3://$configBucket/StorageMirror/Rules/rule.json
[
{
"Source": {
"Prefix": "/myprefix/",
"Suffix": ".csv.gz"
},
"Dest": {
"URL": "gs://${destBucket}",
"Credentials": {
"Parameter": "StorageMirror.GCP-DestProject",
"Key": "smirror"
}
},
"Split": {
"MaxSize": 8388608,
"Template": "%s_%05d"
},
"Codec": "gzip"
}
]
To mirror data from google storage that match /myprefix/ prefix and '.csv' suffix to pubsub 'myTopic' topic you can use the following rule
@gs://$configBucket/StorageMirror/Rules/rule.json
[
{
"Source": {
"Prefix": "/myprefix/",
"Suffix": ".csv"
},
"Dest": {
"Topic": "myTopic"
},
"Split": {
"MaxSize": 524288
},
"OnSuccess": [
{
"Action": "delete"
}
],
"OnFailure": [
{
"Action": "move",
"URL": "gs:///${opsBucket}/StorageMirror/errors/"
}
]
}
]
Source data in divided with a line boundary with max size specified in split section. Each message also get 'source' attribute with source path part.
To mirror data from s3 that match /myprefix/ prefix and '.csv' suffix to simple message 'myQueue' you can use the following rule
@s3://$configBucket/StorageMirror/Rules/rule.json
[
{
"Source": {
"Prefix": "/myprefix/",
"Suffix": ".csv"
},
"Dest": {
"Queue": "myQueue"
},
"Split": {
"MaxLines": 1000,
"Template": "%s_%05d"
},
"OnSuccess": [
{
"Action": "delete"
}
],
"OnFailure": [
{
"Action": "move",
"URL": "s3:///${opsBucket}/StorageMirror/errors/"
}
]
}
]
To partition data from Google Storage that match /data/subfolder prefix and '.csv' suffix to topic mytopic_p$partition you can use the following rule. The process builds partition key from CSV field[0] with mod(2) Destination topic is dynamically evaluated based on parition value.
@gs://$configBucket/StorageMirror/Rules/rule.json
[
{
"Source": {
"Prefix": "/data/subfolder",
"Suffix": ".csv"
},
"Dest": {
"Topic": "mytopic_p$partition"
},
"Split": {
"MaxSize": 1048576,
"Partition": {
"FieldIndex": 0,
"Mod": 2
}
},
"OnSuccess": [
{
"Action": "delete"
}
],
"PreserveDepth": 1
}
]
Smirror command is great place to start to start building and validating transfer/transformer rules locally.
## to validate
smirror -r=myrule.yaml -V
## to mirror from s3 to gs
smirror -s=s3://mybucket/folder -d=gs://myBucket/data
## to test rule localy
smirror -s=/tmp/data -r='myrule.yaml'
Global config delegates a mirror rules to a separate location,
- Mirrors.BaseURL: mirror rule location
- Mirrors.CheckInMs: frequency to reload ruled from specified location
Typical rule defines the following matching Source and mirror destination which are defined are Resource
- Source.Prefix: optional matching prefix
- Source.Suffix: optional matching suffix
- Source.Filter: optional regexp matching filter
- Source.Credentials: optional source credentials
- Source.CustomKey: optional server side encryption AES key
- Dest.URL: destination base location
- Dest.Credentials: optional dest credentials
- Dest.CustomKey: optional server side encryption AES key
- DoneMarker: optional file name that trigger transfer of the holder folder.
When done marker is specified a file can only be mirrored if done marker file is present. Once done marker is uploader the holding folder individual file are re-triggered.
-
Dest.Proxy: optional http proxy
-
Dest.Proxy.URL: http proxy URL
-
Dest.Proxy.Fallback: flag to fallback to regular client if proxy error
-
Dest.Proxy.TimeoutMs: connection timeout
- Dest.Topic: pubsub topic
- Dest.Queue: simple message queue
When destination is a message bus, you have to specify split option, when data is published to destination path defined by split template. Source attribute is added: For example if original resource xx://mybucket/data/p11/events.csv is divided into 2 parts, two messages are published with data payload and /data/p11/0001_events.csv and /data/p11/0002_events.csv source attribute respectively.
Both topic and queue support $partition variable to expanded ir with partition when Split.Partition setting is used.
The following attribute control payload Schema.
-
Schema.Format (CSV or JSON)
-
Schema.Delimiter
-
Schema.LazyQuotes
-
Schema.FieldCount an optional setting for CSV, where each line is adjusted (expand/truncated) to specified number columns.
-
Schema.MaxBadRecords an optional setting to control max allowed bad records (if not specified, all bad records are excluded)
-
Schema.Fields: field collection data type adjustment settings
- Name json field name
- Position csv filed position (if not specified filed validation, conversion is ignored)
- DataType adjustment data type (time, float, int, boolean, int, string)
- SourceDateFormat date format in source filed (java style date format: yyyy-MM-dd hh:mm:ss)
- TargetDateFormat target adjustment date format
- Replace collection on replacement rules
Optionally mirror process can split source content lines by size or max line count.
-
Split.MaxLines: maximum lines in dest splitted file
-
Split.MaxSize: maximum size in dest splitted file (lines are presrved)
-
Split.Template: optional template for dest file name with '%04d_%s' default value, where:
- %d or $chunk - is expanded with a split number
- %s or $name is replaced with a file name,
- %v or $partition is replaced with a partition key value (if applicable)
-
Split.Partition: partition allows you splitting by partition
-
Split.Partition.Field: name of filed (JSON/Avro)
-
Split.Partition.FieldIndex: field index (CSV)
-
Split.Partition.Separator: optional separator, default (,) (CSV)
-
Split.Partition.Hash: optional hash for int64 partition value, the following are supported
- md5
- fnv
- murmur
-
Split.Partition.Mod: optional moulo value for numeric partition value
- Transcoding.Source transcoding source
- Transcoding.Dest transcoding destination
- Transcoding.PathMapping optional path mapping
- Transcoding.Autodetect option source scheme detection flag (currently only with XLSX)
Where Source and Dest support the following attributes:
- Format on of the following: XLSX,CSV,JSON,AVRO
- Fields CSV fields (field names are directly mapped to avro unless path mapping is used)
- HasHeader flag is CSV has header
- Delimiter CSV delimiter
- LazyQuotes flag to use CSV lazy quotes
- SchemaURL avro schema URL
- RecordPerBlock avro setting
By default source the whole source path is copied to destination
- PreserveDepth: optional number to manipulate source path transformation, positive number preserves number of folders from leaf side, and negative truncates from root side.
To see preserve depth control assume the following:
- source URL: xx://myXXBucket/folder/subfolder/grandsubfolder/asset.txt
- dest base URL: yy://myYYBucket/zzz
PreserveDepth | dest URL | description |
---|---|---|
Not specified | yy://myYYBucket/zzz//folder/subfolder/grandsubfolder/asset.txt | the whole source path is preserved |
1 | yy://myYYBucket/zzz/grandsubfolder/asset.txt | source path adds 1 element from leaf size |
-2 | yy://myYYBucket/zzz/grandsubfolder/asset.txt | source path 2 elements truncated from root side |
-1 | yy://myYYBucket/zzz/subfolder/grandsubfolder/asset.txt | source path 1 element truncated from root side |
By default is not split or replacement rules is specified, source is copied to destination without decompressing source archive.
- Codec defines destination codec (gzip is only option currently supported).
- Uncompress: uncompress value is set when split or replace options are used, you can this value for zip/tar source file if you need mirror individual archive assets.
A secrets data use KMS to decypt/encrypt GCP/AWS/Slack secrets or credentials.
- Credentials.Key: KMS key or alias name
- Credentials.Parameter: aws system manager parameters name storing encrypted secrets
- Credentials.URL: location for encrypted secrets
See how to secure:
Server side encryption with Customer-Provided Encryption Keys (AES-256)
- CustomKey.Key: KMS key or alias name
- CustomKey.Parameter: aws system manager parameters name storing encrypted secrets
- CustomKey.URL: location for encrypted secrets
All security sensitive credentials/secrets are stored with KMS service. In Google Cloud Platform in google storage. In Amazon Web Srvice in System Management Service. See deployment details for securing credentials.
Check end to end testing scenario for various rule examples.
To you notify post action you have to supply encryted slack credentials:
where the raw (unecrypted) content uses the following JSON format
{
"Token":"xoxp-myslack_token"
}
- SlackCredentials.Key: KMS key or alias name
- SlackCredentials.Parameter: aws system manager parameters name storing encrypted secrets
- SlackCredentials.URL: location for encrypted secrets
Each mirror rule accepts collection on OnSuccess and OnFailure post actions.
The following action are supported:
- delete: remove source (trigger asset)
{
"OnSuccess": [{"Action": "delete"}]
}
- move: move source to specified destination
{
"OnSuccess": [{
"Action": "move",
"URL": "gs:///${opsBucket}/StorageMirror/processed/"
}],
"OnFailure": [{
"Action": "move",
"URL": "gs:///${opsBucket}/StorageMirror/errors/",
}]
}
- notify: notify slack
{
"OnSuccess": [{
"Action": "notify",
"Title": "Transfer $SourceURL done",
"Message": "success !!",
"Channels": ["#e2e"],
"Body": "$Response"
}]
}
By default any payload smaller than 1 GB is loaded into memory to compute checksum(crc/md5) by upload operation, this means that lambda needs enough memory. Larger content is streamed with checksum computation skipped on upload to reduce memory footprint.
The following global config settings controls streaming behaviour:
- Streaming.ThresholdMb: streaming option threshold
- Streaming.PartSizeMb: download stream part/chunk size
- ChecksumSkipThresholdMb: skiping checksum on upload threshold
Streaming can be also applied on the rule level.
The following are used by storage mirror services:
Prerequisites
- $configBucket: bucket storing storage mirror configuration and mirror rules
- $triggerBucket: bucket storing data that needs to be mirror, event triggered by GCP
- $opsBucket: bucket string error, processed mirrors
- Mirrors.BaseURL: location storing routes rules in YAML or JSON format.
The following Deployment details storage mirror generic deployment.
To simplify mirroring maintenance only one instance of storage mirror is recommended per project or region. Since lambda/cloud function accept only one trigger per bucket you can use sqs/sns/pubsub notification bucket to propagate source event from any bucket to the StorageMirror instance by either invoking or copying/moving underlying resource to trigger bucket.
When copy/move setting is used to trigger control StrageMirror events, the original bucket is added as prefix to the resource path: For example source path gs://myTopicTriggerBucket/myPath/asset.txt is transformed to gs://mytriggerBucet/myTopicTriggerBucket/myPath/asset.txt when copy/move proxy option is used.
All proxy share simple config with dest URL.
In case you do not own or control a bucket that has some mirroring assets you can use cron lambda/cloud function. It pulls external URL to simulate cloud storage event by coping it to main trigger buceket.
StorageMonitor can be used to monitor trigger and error buckets.
On Google Cloud Platform:
curl -d @monitor.json -X POST -H "Content-Type: application/json" $monitorEndpoint
where:
- UnprocessedDuration - check for any unprocessed data file over specified time
- ErrorRecency - specified errors within specified time
On Amazon Web Service:
endly monitor.yaml authWith=aws-e2e
@monitor.yaml
Sometimes during regular operation cloud function or lambda may terminate with error, leaving unprocess file. Replay function will move data back and forth between trigger and replay bucket triggering another event. Each replayed file leaves trace in replay bucket to control no more then one replay per file.
curl -d @replay.json -X POST -H "Content-Type: application/json" $replayEndpoint
{
"TriggerURL": "gs://${triggerBucket}",
"ReplayBucket":"${replayBucket}",
"UnprocessedDuration": "1hour"
}
where:
- UnprocessedDuration - check for any unprocessed data file over specified time
Serverless restriction
- execution time
- network restriction
- memory limitation
- Endly e2e runner or endly docker image
- Google secrets for dedicated e2e project ~/.secret/gcp-e2e.json
- AWS secrets for dedicated e2e account ~/.secret/aws-e2e.json
git clone https://github.com/viant/smirror.git
cd smirror/e2e
### Update mirrors bucket for both S3, Google Storage in e2e/run.yaml (gsTriggerBucket, s3TriggerBucket)
endly
The source code is made available under the terms of the Apache License, Version 2, as stated in the file LICENSE
.
Individual files may be made available under their own specific license, all compatible with Apache License, Version 2. Please see individual files for details.
Library Author: Adrian Witas