SQS FIFO черги SNS Pub/Sub: Системи на основі подій

Сервіси обміну повідомленнями AWS: Глибоке занурення в SQS, SNS та Kinesis

Вступ

Роз'єднані архітектури є важливими для створення масштабованих, стійких хмарних застосунків. AWS надає три основні сервіси обміну повідомленнями для різних шаблонів комунікації: Amazon SQS для асинхронного обміну повідомленнями на основі черг, Amazon SNS для pub/sub та розсилки сповіщень, а також Amazon Kinesis для потокової передачі даних реального часу та аналітики.

Цей посібник досліджує архітектуру, конфігурацію та production шаблони для кожного сервісу. Ми розглянемо стандартні черги SQS проти FIFO з чергами недоставлених повідомлень, підписки на теми SNS з фільтрацією повідомлень та потоки даних Kinesis з шаблонами масштабування споживачів.

Серія AWS Developer Certification

📚 Переглянути повний посібник AWS Developer Certification - Опануйте всі 7 частин з нашим комплексним шляхом навчання.

Це Частина III (Поточна стаття) нашого комплексного 7-частинного посібника для AWS розробників:

  1. Частина I: IAM EC2 та Auto Scaling
  2. Частина II: RDS Aurora та DynamoDB
  3. Частина III: SQS SNS та Kinesis (Поточна стаття)
  4. Частина IV: Lambda API Gateway
  5. Частина V: ECS Fargate та IaC
  6. Частина VI: Cognito KMS Security
  7. Частина VII: CodePipeline та Monitoring

← Частина II: RDS Aurora та DynamoDB | Частина IV: Lambda API Gateway →


Amazon SQS (Simple Queue Service)

Архітектура SQS

SQS надає повністю керовані черги повідомлень для роз'єднання компонентів застосунку з доставкою принаймні один раз.

   Message Consumers   

   Amazon SQS   

   Message Producers   

   send   

   send   

   send   

   poll   

   poll   

   poll   

   max retries   

  Application 1  

  Application 2  

  Application 3  

  SQS Queue  

  Message Buffer  

  Dead Letter Queue  

  Failed Messages  

  Consumer 1  

  Consumer 2  

  Consumer 3  

Ключові функції:

  • Доставка принаймні один раз (Standard) або обробка рівно один раз (FIFO)
  • Утримання повідомлень: від 1 хвилини до 14 днів (за замовчуванням 4 дні)
  • Розмір повідомлення: до 256 KB (використовуйте S3 для більших)
  • Таймаут видимості: запобігає дублюванню обробки (за замовчуванням 30с)
  • Довгий polling: зменшує порожні відповіді (до 20с очікування)

SQS Standard проти FIFO черг

Функція Standard Queue FIFO Queue
Пропускна здатність Необмежена TPS 300 TPS (3,000 з батчингом)
Порядок Найкраще намагання впорядкування Строге впорядкування
Доставка Принаймні один раз Обробка рівно один раз
Випадок використання Висока пропускна здатність, дублікати OK Робочі процеси з критичним порядком
Іменування Будь-яке ім'я Має закінчуватися на .fifo

Конфігурація SQS з Terraform

resource "aws_sqs_queue" "orders" {
  name                       = "orders-queue.fifo"
  fifo_queue                 = true
  content_based_deduplication = true

  # Налаштування повідомлень
  message_retention_seconds  = 1209600  # 14 днів
  visibility_timeout_seconds = 300      # 5 хвилин

  # Черга недоставлених повідомлень
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.orders_dlq.arn
    maxReceiveCount     = 3
  })

  # Шифрування
  sqs_managed_sse_enabled = true
}

resource "aws_sqs_queue" "orders_dlq" {
  name                      = "orders-dlq.fifo"
  fifo_queue                = true
  message_retention_seconds = 1209600
}

Шаблони обробки повідомлень SQS

Довгий Polling (Рекомендовано):

import boto3
import json

sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/orders-queue.fifo'

def process_messages():
    while True:
        # Довгий polling з 20-секундним очікуванням
        response = sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20,  # Довгий polling
            VisibilityTimeout=300,
            MessageAttributeNames=['All']
        )

        messages = response.get('Messages', [])

        for message in messages:
            try:
                # Обробити повідомлення
                body = json.loads(message['Body'])
                process_order(body)

                # Видалити після успішної обробки
                sqs.delete_message(
                    QueueUrl=queue_url,
                    ReceiptHandle=message['ReceiptHandle']
                )
            except Exception as e:
                print(f"Помилка обробки повідомлення: {e}")
                # Повідомлення повертається в чергу після таймауту видимості

Пакетні операції:

# Надсилання пакету (до 10 повідомлень)
entries = [
    {
        'Id': str(i),
        'MessageBody': json.dumps({'order_id': i}),
        'MessageGroupId': 'orders',  # Вимога FIFO
        'MessageDeduplicationId': str(uuid.uuid4())
    }
    for i in range(10)
]

sqs.send_message_batch(QueueUrl=queue_url, Entries=entries)

Amazon SNS (Simple Notification Service)

Архітектура SNS

SNS надає pub/sub обмін повідомленнями для шаблонів розсилки та доставки через багато протоколів.

   Subscribers   

   Amazon SNS   

   Publishers   

   publish   

   publish   

   publish   

   deliver   

   invoke   

   send   

   send   

   POST   

  Application  

  Lambda Function  

  CloudWatch Alarm  

  SNS Topic  

  Message Fan-out  

  SQS Queue  

  Lambda  

  Email  

  SMS  

  HTTP Endpoint  

Підтримувані протоколи:

  • SQS - Обробка на основі черг
  • Lambda - Безсерверна обробка подій
  • HTTP/HTTPS - Доставка webhook
  • Email/Email-JSON - Сповіщення
  • SMS - Текстові повідомлення
  • Mobile Push - iOS, Android, Firebase

SNS з шаблоном розсилки SQS

resource "aws_sns_topic" "orders" {
  name = "order-events"

  # Увімкнути шифрування
  kms_master_key_id = aws_kms_key.sns.id
}

# Підписка 1: Сервіс інвентаризації
resource "aws_sns_topic_subscription" "inventory" {
  topic_arn = aws_sns_topic.orders.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.inventory.arn

  # Фільтрація повідомлень
  filter_policy = jsonencode({
    event_type = ["order_placed", "order_cancelled"]
  })
}

# Підписка 2: Сервіс аналітики
resource "aws_sns_topic_subscription" "analytics" {
  topic_arn = aws_sns_topic.orders.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.analytics.arn
}

# Політика черги SQS для дозволу SNS
resource "aws_sqs_queue_policy" "inventory" {
  queue_url = aws_sqs_queue.inventory.url

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect = "Allow"
      Principal = { Service = "sns.amazonaws.com" }
      Action = "sqs:SendMessage"
      Resource = aws_sqs_queue.inventory.arn
      Condition = {
        ArnEquals = { "aws:SourceArn" = aws_sns_topic.orders.arn }
      }
    }]
  })
}

Фільтрація повідомлень SNS

import boto3
import json

sns = boto3.client('sns')

# Публікація з атрибутами для фільтрації
sns.publish(
    TopicArn='arn:aws:sns:us-east-1:123456789012:order-events',
    Message=json.dumps({
        'order_id': '12345',
        'amount': 99.99,
        'status': 'placed'
    }),
    MessageAttributes={
        'event_type': {'DataType': 'String', 'StringValue': 'order_placed'},
        'priority': {'DataType': 'String', 'StringValue': 'high'},
        'region': {'DataType': 'String', 'StringValue': 'us-east-1'}
    }
)

Приклад політики фільтрації:

{
  "event_type": ["order_placed", "order_shipped"],
  "priority": ["high"],
  "region": [{ "prefix": "us-" }]
}

Amazon Kinesis Data Streams

Архітектура Kinesis

Kinesis забезпечує обробку потокових даних реального часу з впорядкованими записами, що можна переглядати.

   Data Consumers   

   Kinesis Data Stream   

   Data Producers   

   put records   

   put records   

   put records   

   process   

   query   

   batch load   

  Web App  

  IoT Devices  

  Log Aggregator  

  Shard 1  

  1 MB/s write  

  Shard 2  

  1 MB/s write  

  Shard 3  

  1 MB/s write  

  Lambda  

  Real-time  

  Kinesis Analytics  

  SQL Queries  

  Kinesis Firehose  

  S3/Redshift  

Ключові концепції:

  • Shard: Базова одиниця потужності (1 MB/s або 1,000 записів/с запис, 2 MB/s читання)
  • Partition Key: Визначає призначення шарда (використовуйте ключі з високою кардинальністю)
  • Sequence Number: Унікальний для шарда, зберігає порядок
  • Retention: 24 години за замовчуванням (до 365 днів з розширеним утриманням)
  • Consumers: Спільні (2 MB/s на шард) або Enhanced Fan-out (2 MB/s на споживача)

Конфігурація Kinesis Stream

resource "aws_kinesis_stream" "events" {
  name             = "application-events"
  shard_count      = 3
  retention_period = 168  # 7 днів

  shard_level_metrics = [
    "IncomingBytes",
    "IncomingRecords",
    "OutgoingBytes",
    "OutgoingRecords"
  ]

  stream_mode_details {
    stream_mode = "PROVISIONED"  # або ON_DEMAND
  }

  encryption_type = "KMS"
  kms_key_id      = aws_kms_key.kinesis.id
}

Kinesis Producer та Consumer

Kinesis Producer (Python):

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis')

def send_event(user_id, event_type, data):
    kinesis.put_record(
        StreamName='application-events',
        Data=json.dumps({
            'user_id': user_id,
            'event_type': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'data': data
        }),
        PartitionKey=str(user_id)  # Забезпечує, що той самий користувач йде в той самий шард
    )

# Пакетні записи (до 500 записів)
records = [
    {
        'Data': json.dumps({'user_id': i, 'action': 'click'}),
        'PartitionKey': str(i)
    }
    for i in range(100)
]

kinesis.put_records(StreamName='application-events', Records=records)

Lambda Consumer:

import json
import base64

def lambda_handler(event, context):
    for record in event['Records']:
        # Дані Kinesis закодовані base64
        payload = base64.b64decode(record['kinesis']['data'])
        data = json.loads(payload)

        print(f"Обробка: {data}")

        # Обробити подію
        process_event(data)

    return {'statusCode': 200}

def process_event(data):
    # Бізнес-логіка
    user_id = data['user_id']
    event_type = data['event_type']
    # ... обробка даних

Порівняння та випадки використання

Коли використовувати кожен сервіс

Сервіс Випадок використання Шаблон
SQS Standard Асинхронна обробка з високою пропускною здатністю, дублікати OK Роз'єднання на основі черг
SQS FIFO Робочі процеси з критичним порядком, обробка рівно один раз Впорядкована обробка черг
SNS Розсилка до багатьох підписників, доставка через багато протоколів Pub/sub сповіщення
SNS + SQS Надійна розсилка з буферизацією черг Шаблон розсилки + черги
Kinesis Аналітика реального часу, впорядковане потокове передавання, можливість перегляду Обробка потоків

Архітектурні шаблони

Шаблон 1: Роз'єднані мікросервіси (SQS)

  • Producer надсилає повідомлення в SQS
  • Consumer опитує та обробляє асинхронно
  • DLQ обробляє невдалі повідомлення

Шаблон 2: Розсилка подій (SNS + SQS)

  • Publisher надсилає в тему SNS
  • Декілька черг SQS підписуються на тему
  • Кожен сервіс обробляє незалежно

Шаблон 3: Аналітика реального часу (Kinesis)

  • Producers передають події в Kinesis
  • Lambda обробляє в реальному часі
  • Firehose архівує в S3/Redshift

Production найкращі практики

Найкращі практики SQS

  1. Таймаут видимості: Встановіть в 6x від вашого часу обробки
  2. Dead Letter Queue: Завжди налаштовуйте з maxReceiveCount 3-5
  3. Довгий Polling: Увімкніть WaitTimeSeconds=20 для зменшення витрат
  4. Батчинг: Використовуйте пакетні операції для пропускної здатності (максимум 10 повідомлень)
  5. Ідемпотентність: Обробляйте дублікати повідомлень (використовуйте ID дедуплікації для FIFO)

Найкращі практики SNS

  1. Фільтрація повідомлень: Використовуйте політики фільтрації для зменшення непотрібних доставок
  2. Політики повторів: Налаштуйте повторні спроби доставки для HTTP/HTTPS endpoints
  3. DLQ для Lambda: Налаштуйте DLQ для невдалих викликів Lambda
  4. Шифрування: Увімкніть шифрування KMS для конфіденційних даних
  5. Raw Message Delivery: Увімкніть для підписників SQS, щоб уникнути обгортки SNS

Найкращі практики Kinesis

  1. Partition Keys: Використовуйте ключі з високою кардинальністю (ID користувача, ID пристрою) для розподілу навантаження
  2. Масштабування шардів: Моніторте метрики IncomingBytes та IncomingRecords
  3. Enhanced Fan-out: Використовуйте для багатьох споживачів (виділені 2 MB/s на споживача)
  4. Checkpointing: Використовуйте KCL (Kinesis Client Library) для автоматичного checkpointing
  5. Обробка помилок: Налаштуйте повторні спроби Lambda та DLQ для обробки потоків

Моніторинг та усунення неполадок

Ключові метрики для моніторингу

SQS:

  • ApproximateNumberOfMessagesVisible (глибина черги)
  • ApproximateAgeOfOldestMessage (затримка обробки)
  • NumberOfMessagesSent / NumberOfMessagesDeleted

SNS:

  • NumberOfMessagesPublished
  • NumberOfNotificationsFailed
  • NumberOfNotificationsDelivered

Kinesis:

  • IncomingBytes / IncomingRecords (пропускна здатність producer)
  • GetRecords.IteratorAgeMilliseconds (затримка consumer)
  • WriteProvisionedThroughputExceeded (throttling)

Поради для іспиту

Ключові концепції:

  1. SQS FIFO вимагає суфікса .fifo та має ліміт 300 TPS (3,000 з батчингом)
  2. SNS не може доставляти в SQS FIFO - використовуйте SNS Standard → SQS FIFO
  3. Впорядкування Kinesis за шардом (використовуйте той самий ключ партиції для впорядкування)
  4. Dead Letter Queues для налагодження невдалих повідомлень, а не для логіки повторів

Поширені сценарії:

  • "Роз'єднати мікросервіси" → SQS
  • "Розсилка до багатьох сервісів" → SNS + SQS
  • "Аналітика кліків реального часу" → Kinesis Data Streams
  • "Обробка рівно один раз по порядку" → SQS FIFO
  • "Перегляд подій з 3 днів тому" → Kinesis (з розширеним утриманням)

Висновок

Сервіси обміну повідомленнями AWS надають спеціалізовані рішення для різних шаблонів комунікації. SQS забезпечує асинхронне роз'єднання з обміном повідомленнями на основі черг, SNS надає pub/sub розсилку до багатьох підписників, а Kinesis забезпечує потокове передавання реального часу для аналітики та обробки подій.

Обирайте SQS для надійної асинхронної обробки, SNS для трансляції подій до багатьох споживачів та Kinesis для впорядкованих потоків даних, що можна переглядати. Розуміння гарантій впорядкування повідомлень, обмежень пропускної здатності та шаблонів інтеграції є важливим як для production архітектур, так і для іспиту AWS Certified Developer Associate.


Часті запитання

П: Чим відрізняється SQS Standard від SQS FIFO?

SQS Standard надає доставку щонайменше один раз з найкращим зусиллям впорядкування та необмеженою пропускною здатністю. SQS FIFO гарантує доставку рівно один раз з точним впорядкуванням, але обмежена 300 транзакцій/сек (3000 з батчингом). Використовуйте Standard для високої пропускної здатності, FIFO для критичного впорядкування.

П: Як працює SNS fan-out до SQS?

SNS отримує одне повідомлення та доставляє копії до кількох підписників SQS черг одночасно. Це роз'єднує видавців від споживачів, дозволяючи кожному сервісу обробляти повідомлення незалежно своїм темпом. Використовуйте для розсилки подій, обробки паралельних конвеєрів або архітектур на основі подій.

П: Що таке Kinesis Data Streams shards?

Shard - це одиниця потужності в Kinesis, що забезпечує 1MB/сек запису та 2MB/сек читання. Кожна shard зберігає впорядковану послідовність записів з унікальними номерами послідовності. Використовуйте ключі партиції для розподілу даних між shards. Додайте shards для масштабування пропускної здатності, розділіть або об'єднайте для регулювання.

П: Коли використовувати DLQ для SQS?

Dead Letter Queues обробляють повідомлення, що постійно не обробляються після maxReceiveCount спроб. Використовуйте DLQ для ізоляції проблемних повідомлень для налагодження, запобігання блокуванню черги отруйними повідомленнями та аналізу шаблонів відмов. Встановіть redrive policy на вихідній черзі, вказуючи DLQ ARN та maxReceiveCount.

П: Як працює long polling в SQS?

Long polling дозволяє ReceiveMessage запитам чекати повідомлення, а не повертати порожні відповіді, зменшуючи порожні відповіді та витрати. Встановіть WaitTimeSeconds до 20 секунд на черзі або запиті. Long polling більш економічний та зменшує запити API порівняно з short polling з негайними відповідями.

П: Що таке Kinesis Data Firehose проти Data Streams?

Data Streams - це керований сервіс низької затримки для власної обробки даних потоку з періодом утримання. Firehose - це повністю керований сервіс, що автоматично навантажує потокові дані в S3, Redshift, Elasticsearch або Splunk з можливим перетворенням Lambda. Використовуйте Streams для користувацької обробки, Firehose для простого завантаження в сховища даних.

П: Як працює SNS message filtering?

Політики фільтрації підписки SNS дозволяють підписникам отримувати тільки підмножину повідомлень на основі атрибутів повідомлення. Визначте JSON політику фільтрації з умовами відповідності (exact, prefix, numeric). SNS фільтрує на стороні сервісу перед доставкою, зменшуючи небажаний трафік та обробку. Ідеально для маршрутизації подій та систем на основі подій.