Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e-cli/e2e-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": "python",
"test_suites": "basic",
"test_suites": "basic,retry",
"auto_settings": false,
"patch": null,
"env": {}
Expand Down
15 changes: 11 additions & 4 deletions e2e-cli/src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def run(input_json: str, debug: bool):
"""Run the E2E CLI with the given input configuration."""
logger = setup_logging(debug)
output = {"success": False, "sentBatches": 0, "error": None}
delivery_errors = []

def on_error(error, batch):
delivery_errors.append(str(error))

try:
data = json.loads(input_json)
Expand All @@ -96,6 +100,7 @@ def run(input_json: str, debug: bool):
write_key=write_key,
host=api_host,
debug=debug,
on_error=on_error,
upload_size=flush_at,
upload_interval=flush_interval,
max_retries=max_retries,
Expand All @@ -120,10 +125,12 @@ def run(input_json: str, debug: bool):
client.flush()
client.join()

output["success"] = True
# Note: We don't have easy access to batch count from the SDK internals
# This could be enhanced if needed
output["sentBatches"] = 1 # Placeholder
if delivery_errors:
output["success"] = False
output["error"] = delivery_errors[0]
else:
output["success"] = True
output["sentBatches"] = 1

except json.JSONDecodeError as e:
output["error"] = f"Invalid JSON input: {e}"
Expand Down
17 changes: 15 additions & 2 deletions segment/analytics/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class DefaultConfig(object):
max_queue_size = 10000
gzip = False
timeout = 15
max_retries = 10
max_retries = 1000
max_total_backoff_duration = 43200
max_rate_limit_duration = 43200
proxies = None
thread = 1
upload_interval = 0.5
Expand Down Expand Up @@ -65,9 +67,16 @@ def __init__(self,
oauth_client_key=DefaultConfig.oauth_client_key,
oauth_key_id=DefaultConfig.oauth_key_id,
oauth_auth_server=DefaultConfig.oauth_auth_server,
oauth_scope=DefaultConfig.oauth_scope,):
oauth_scope=DefaultConfig.oauth_scope,
max_total_backoff_duration=DefaultConfig.max_total_backoff_duration,
max_rate_limit_duration=DefaultConfig.max_rate_limit_duration,):
require('write_key', write_key, str)

if max_total_backoff_duration is not None and max_total_backoff_duration < 0:
raise ValueError('max_total_backoff_duration must be non-negative')
if max_rate_limit_duration is not None and max_rate_limit_duration < 0:
raise ValueError('max_rate_limit_duration must be non-negative')

self.queue = queue.Queue(max_queue_size)
self.write_key = write_key
self.on_error = on_error
Expand All @@ -78,6 +87,8 @@ def __init__(self,
self.gzip = gzip
self.timeout = timeout
self.proxies = proxies
self.max_total_backoff_duration = max_total_backoff_duration
self.max_rate_limit_duration = max_rate_limit_duration
self.oauth_manager = None
if(oauth_client_id and oauth_client_key and oauth_key_id):
self.oauth_manager = OauthManager(oauth_client_id, oauth_client_key, oauth_key_id,
Expand Down Expand Up @@ -110,6 +121,8 @@ def __init__(self,
upload_size=upload_size, upload_interval=upload_interval,
gzip=gzip, retries=max_retries, timeout=timeout,
proxies=proxies, oauth_manager=self.oauth_manager,
max_total_backoff_duration=max_total_backoff_duration,
max_rate_limit_duration=max_rate_limit_duration,
)
self.consumers.append(consumer)

Expand Down
238 changes: 202 additions & 36 deletions segment/analytics/consumer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging
import time
import random
from threading import Thread
import backoff
import json

from segment.analytics.request import post, APIError, DatetimeSerializer
from segment.analytics.request import post, APIError, DatetimeSerializer, parse_retry_after

from queue import Empty

Expand All @@ -14,6 +14,10 @@
# lower to leave space for extra data that will be added later, eg. "sentAt".
BATCH_SIZE_LIMIT = 475000

# Default duration limits (12 hours in seconds)
DEFAULT_MAX_TOTAL_BACKOFF_DURATION = 43200
DEFAULT_MAX_RATE_LIMIT_DURATION = 43200


class FatalError(Exception):
def __init__(self, message):
Expand All @@ -29,8 +33,10 @@ class Consumer(Thread):
log = logging.getLogger('segment')

def __init__(self, queue, write_key, upload_size=100, host=None,
on_error=None, upload_interval=0.5, gzip=False, retries=10,
timeout=15, proxies=None, oauth_manager=None):
on_error=None, upload_interval=0.5, gzip=False, retries=1000,
timeout=15, proxies=None, oauth_manager=None,
max_total_backoff_duration=DEFAULT_MAX_TOTAL_BACKOFF_DURATION,
max_rate_limit_duration=DEFAULT_MAX_RATE_LIMIT_DURATION):
"""Create a consumer thread."""
Thread.__init__(self)
# Make consumer a daemon thread so that it doesn't block program exit
Expand All @@ -51,6 +57,12 @@ def __init__(self, queue, write_key, upload_size=100, host=None,
self.timeout = timeout
self.proxies = proxies
self.oauth_manager = oauth_manager
self.max_total_backoff_duration = max_total_backoff_duration
self.max_rate_limit_duration = max_rate_limit_duration

# Rate-limit state
self.rate_limited_until = None
self.rate_limit_start_time = None

def run(self):
"""Runs the consumer."""
Expand All @@ -64,16 +76,77 @@ def pause(self):
"""Pause the consumer."""
self.running = False

def set_rate_limit_state(self, response):
"""Set rate-limit state from a 429 response with a valid Retry-After header."""
retry_after = parse_retry_after(response) if response is not None else None
if retry_after is not None:
self.rate_limited_until = time.time() + retry_after
if self.rate_limit_start_time is None:
self.rate_limit_start_time = time.time()

def clear_rate_limit_state(self):
"""Clear rate-limit state after successful request or duration exceeded."""
self.rate_limited_until = None
self.rate_limit_start_time = None

def upload(self):
"""Upload the next batch of items, return whether successful."""
success = False
batch = self.next()
if len(batch) == 0:
return False

# Check rate-limit state before attempting upload
if self.rate_limited_until is not None:
now = time.time()

# Check if maxRateLimitDuration has been exceeded
if (self.rate_limit_start_time is not None and
now - self.rate_limit_start_time > self.max_rate_limit_duration):
self.log.error(
'Rate limit duration exceeded (%ds). Clearing rate-limit state and dropping batch.',
self.max_rate_limit_duration
)
self.clear_rate_limit_state()
# Drop the batch by marking items as done
if self.on_error:
self.on_error(
Exception('Rate limit duration exceeded, batch dropped'),
batch
)
for _ in batch:
self.queue.task_done()
return False

# Still rate-limited; wait until the rate limit expires
wait_time = self.rate_limited_until - now
if wait_time > 0:
self.log.debug(
'Rate-limited. Waiting %.2fs before next upload attempt.',
wait_time
)
time.sleep(wait_time)

try:
self.request(batch)
# Success — clear rate-limit state
self.clear_rate_limit_state()
success = True
except APIError as e:
if e.status == 429 and self.rate_limited_until is not None:
# 429: rate-limit state already set by request(). Re-queue batch.
self.log.debug('429 received. Re-queuing batch and halting upload iteration.')
for item in batch:
try:
self.queue.put(item, block=False)
except Exception:
pass # Queue full, item lost
success = False
else:
self.log.error('error uploading: %s', e)
success = False
if self.on_error:
self.on_error(e, batch)
except Exception as e:
self.log.error('error uploading: %s', e)
success = False
Expand Down Expand Up @@ -120,40 +193,133 @@ def next(self):
return items

def request(self, batch):
"""Attempt to upload the batch and retry before raising an error """

def fatal_exception(exc):
if isinstance(exc, APIError):
# retry on server errors and client errors
# with 429 status code (rate limited),
# don't retry on other client errors
return (400 <= exc.status < 500) and exc.status != 429
elif isinstance(exc, FatalError):
"""Attempt to upload the batch and retry before raising an error"""

def is_retryable_status(status):
"""
Determine if a status code is retryable.
Retryable 4xx: 408, 410, 429, 460
Non-retryable 4xx: 400, 401, 403, 404, 413, 422, and all other 4xx
Retryable 5xx: All except 501, 505
- 511 is only retryable when OauthManager is configured
Non-retryable 5xx: 501, 505
"""
if 400 <= status < 500:
return status in (408, 410, 429, 460)
elif 500 <= status < 600:
if status in (501, 505):
return False
if status == 511:
return self.oauth_manager is not None
return True
else:
# retry on all other errors (eg. network)
return False
return False

def calculate_backoff_delay(attempt):
"""
Calculate exponential backoff delay with jitter.
First retry is immediate, then 0.5s, 1s, 2s, 4s, etc.
"""
if attempt == 1:
return 0 # First retry is immediate
base_delay = 0.5 * (2 ** (attempt - 2))
jitter = random.uniform(0, 0.1 * base_delay)
return min(base_delay + jitter, 60) # Cap at 60 seconds

total_attempts = 0
backoff_attempts = 0
first_failure_time = None

while True:
total_attempts += 1

attempt_count = 0

@backoff.on_exception(
backoff.expo,
Exception,
max_tries=self.retries + 1,
giveup=fatal_exception,
on_backoff=lambda details: self.log.debug(
f"Retry attempt {details['tries']}/{self.retries + 1} after {details['elapsed']:.2f}s"
))
def send_request():
nonlocal attempt_count
attempt_count += 1
try:
return post(self.write_key, self.host, gzip=self.gzip,
timeout=self.timeout, batch=batch, proxies=self.proxies,
oauth_manager=self.oauth_manager)
except Exception as e:
if attempt_count >= self.retries + 1:
self.log.error(f"All {self.retries} retries exhausted. Final error: {e}")
# Make the request with current retry count
response = post(
self.write_key,
self.host,
gzip=self.gzip,
timeout=self.timeout,
batch=batch,
proxies=self.proxies,
oauth_manager=self.oauth_manager,
retry_count=total_attempts - 1
)
# Success
return response

except FatalError as e:
# Non-retryable error
self.log.error(f"Fatal error after {total_attempts} attempts: {e}")
raise

send_request()
except APIError as e:
# 429 with valid Retry-After: set rate-limit state and raise
# to caller (pipeline blocking). Without Retry-After, fall
# through to counted backoff like any other retryable error.
if e.status == 429:
retry_after = parse_retry_after(e.response) if e.response is not None else None
if retry_after is not None:
self.set_rate_limit_state(e.response)
raise

# Check if status is retryable
if not is_retryable_status(e.status):
self.log.error(
f"Non-retryable error {e.status} after {total_attempts} attempts: {e}"
)
raise

# Transient error -- per-batch backoff
if first_failure_time is None:
first_failure_time = time.time()
if time.time() - first_failure_time > self.max_total_backoff_duration:
self.log.error(
f"Max total backoff duration ({self.max_total_backoff_duration}s) exceeded "
f"after {total_attempts} attempts. Final error: {e}"
)
raise

# Count this against backoff attempts
backoff_attempts += 1
if backoff_attempts >= self.retries + 1:
self.log.error(
f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}"
)
raise

# Calculate exponential backoff delay with jitter
delay = calculate_backoff_delay(backoff_attempts)

self.log.debug(
f"Retry attempt {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) "
f"after {delay:.2f}s for status {e.status}"
)
time.sleep(delay)

except Exception as e:
# Network errors or other exceptions - retry with backoff
if first_failure_time is None:
first_failure_time = time.time()
if time.time() - first_failure_time > self.max_total_backoff_duration:
self.log.error(
f"Max total backoff duration ({self.max_total_backoff_duration}s) exceeded "
f"after {total_attempts} attempts. Final error: {e}"
)
raise

backoff_attempts += 1

if backoff_attempts >= self.retries + 1:
self.log.error(
f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}"
)
raise

# Calculate exponential backoff delay with jitter
delay = calculate_backoff_delay(backoff_attempts)

self.log.debug(
f"Network error retry {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) "
f"after {delay:.2f}s: {e}"
)
time.sleep(delay)
Loading