Created
August 3, 2020 19:21
-
-
Save halfvector/91b99d54688d2366a71037765272c9af to your computer and use it in GitHub Desktop.
Using aws-sdk-go to pickup file changes from S3 via S3->SNS->SQS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/s3" | |
"github.com/aws/aws-sdk-go/service/sqs" | |
"io/ioutil" | |
"time" | |
) | |
func main() { | |
sess, err := session.NewSessionWithOptions(session.Options{ | |
Profile: "readonly-sqs-account", // profile from ~/.aws/credentials that defines aws_access_key_id and aws_secret_access_key | |
Config: aws.Config{ | |
Region: aws.String("us-west-2"), // region of the queue | |
}, | |
}) | |
if err != nil { | |
panic(err) | |
} | |
sqsSvc := sqs.New(sess) | |
s3Svc := s3.New(sess) | |
url, err := sqsSvc.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String("sqs-queue-name")}) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Printf("queue url: %v\n", *url.QueueUrl) | |
for i := 0; i < 300; i++ { | |
message, err := sqsSvc.ReceiveMessage(&sqs.ReceiveMessageInput{ | |
// some attributes could be helpful for debugging, worth exploring them | |
AttributeNames: []*string{ | |
aws.String(sqs.MessageSystemAttributeNameSentTimestamp), | |
}, | |
MessageAttributeNames: []*string{ | |
aws.String(sqs.QueueAttributeNameAll), | |
}, | |
QueueUrl: url.QueueUrl, | |
MaxNumberOfMessages: aws.Int64(10), // get the max batch size | |
VisibilityTimeout: aws.Int64(15), // how long to keep this message away from any other consumers | |
WaitTimeSeconds: aws.Int64(10), // long polling, wait this long for new messages to arrive. | |
}) | |
if err != nil { | |
panic(err) | |
} | |
for _, msg := range message.Messages { | |
// unpack sns message envelope that is in the queue | |
var snsMsg SnsMessage | |
err := json.Unmarshal([]byte(*msg.Body), &snsMsg) | |
if err != nil { | |
panic(err) | |
} | |
// unpack the s3 event inside the sns envelope | |
var s3event S3Event | |
err = json.Unmarshal([]byte(snsMsg.Message), &s3event) | |
if err != nil { | |
panic(err) | |
} | |
// each s3 event can have multiple effected s3 records with various actions | |
for _, record := range s3event.Records { | |
fmt.Printf("%v - %v - %v - %.0f seconds ago\n", record.S3.Bucket.Name, record.EventName, record.S3.Object.Key, time.Since(record.EventTime).Seconds()) | |
// for some events, we might want to read the file (ie if its an xml file) | |
if record.EventName == "ObjectCreated:Put" { | |
object, err := s3Svc.GetObject(&s3.GetObjectInput{ | |
Bucket: aws.String(record.S3.Bucket.Name), | |
Key: aws.String(record.S3.Object.Key), | |
}) | |
if err != nil { | |
panic(err) | |
} | |
payload, err := ioutil.ReadAll(object.Body) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Printf("file contents: %v\n", string(payload)) | |
} | |
} | |
// the SQS way of acking a message is to delete it | |
_, err = sqsSvc.DeleteMessage(&sqs.DeleteMessageInput{ | |
QueueUrl: url.QueueUrl, | |
ReceiptHandle: msg.ReceiptHandle, | |
}) | |
if err != nil { | |
panic(err) | |
} | |
} | |
} | |
} | |
type SnsMessage struct { | |
Type string `json:"Type"` | |
MessageID string `json:"MessageId"` | |
TopicArn string `json:"TopicArn"` | |
Subject string `json:"Subject"` | |
Message string `json:"Message"` // will be s3 event | |
Timestamp time.Time `json:"Timestamp"` | |
SignatureVersion string `json:"SignatureVersion"` | |
Signature string `json:"Signature"` | |
SigningCertURL string `json:"SigningCertURL"` | |
UnsubscribeURL string `json:"UnsubscribeURL"` | |
} | |
type S3Event struct { | |
Records []struct { | |
EventVersion string `json:"eventVersion"` | |
EventSource string `json:"eventSource"` | |
AwsRegion string `json:"awsRegion"` | |
EventTime time.Time `json:"eventTime"` | |
EventName string `json:"eventName"` | |
UserIdentity struct { | |
PrincipalID string `json:"principalId"` | |
} `json:"userIdentity"` | |
RequestParameters struct { | |
SourceIPAddress string `json:"sourceIPAddress"` | |
} `json:"requestParameters"` | |
ResponseElements struct { | |
XAmzRequestID string `json:"x-amz-request-id"` | |
XAmzID2 string `json:"x-amz-id-2"` | |
} `json:"responseElements"` | |
S3 struct { | |
S3SchemaVersion string `json:"s3SchemaVersion"` | |
ConfigurationID string `json:"configurationId"` | |
Bucket struct { | |
Name string `json:"name"` | |
OwnerIdentity struct { | |
PrincipalID string `json:"principalId"` | |
} `json:"ownerIdentity"` | |
Arn string `json:"arn"` | |
} `json:"bucket"` | |
Object struct { | |
Key string `json:"key"` | |
Size int `json:"size"` | |
ETag string `json:"eTag"` | |
Sequencer string `json:"sequencer"` | |
} `json:"object"` | |
} `json:"s3"` | |
} `json:"Records"` | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment