Skip to main content

Email Drafter Feature

Overview

The Email Drafter feature is an AI-powered system that automatically monitors Gmail accounts for incoming emails and generates draft responses when appropriate. The system uses Google Cloud Pub/Sub for real-time email notifications, Temporal workflows for reliable processing, and AI models to generate contextual email responses.

Architecture

Setup

1. Google Cloud Platform Configuration

Enable Pub/Sub API

  1. Go to Google Cloud Console
  2. Navigate to APIs & ServicesLibrary
  3. Search for "Pub/Sub API" and enable it

Create Pub/Sub Topic

  1. Navigate to Pub/SubTopics
  2. Click Create Topic
  3. Set topic ID: gmail-notifications
  4. Important: Uncheck "Add default subscription"
  5. Click Create

Configure Topic Permissions

  1. Go to the newly created gmail-notifications topic
  2. Navigate to the Permissions tab
  3. Click + Add Principal
  4. Add principal: gmail-api-push@system.gserviceaccount.com
  5. Assign role: Pub/Sub Publisher
  6. Click Save

Create Subscription

  1. Go to Subscriptions in the Pub/Sub console
  2. Click Create Subscription
  3. Set subscription ID: gmail-subscription
  4. Select topic: gmail-notifications
  5. Configure settings:
    • Delivery type: Pull
    • Message retention: 7 days
    • Expiration period: Never expire
    • Ack deadline: 300s
    • Enable exactly once delivery
    • Enable message ordering
  6. Click Create

2. Service Account Configuration

Create Service Account

  1. Go to IAM & AdminService Accounts
  2. Select your project
  3. Click Create Service Account
  4. Grant the following roles:
    • Pub/Sub Subscriber (to pull messages)
    • Pub/Sub Editor (to manage subscriptions)

Generate Service Account Key

  1. Click on the created service account
  2. Go to Keys tab
  3. Click Add KeyCreate new keyJSON
  4. Download the JSON file (e.g., google-credentials.json)

3. Environment Variables

Add the following environment variables to your .env file:

# Google Cloud Configuration
GOOGLE_CLOUD_PROJECT_ID=your-project-id
GOOGLE_SERVICE_ACCOUNT_JSON={"type":"service_account","project_id":"..."}
GMAIL_PUBSUB_TOPIC=gmail-notifications
GMAIL_PUBSUB_SUBSCRIPTION=gmail-subscription

# Google OAuth (for user authentication)
GOOGLE_OAUTH_CLIENT_ID=your-oauth-client-id
GOOGLE_OAUTH_CLIENT_SECRET=your-oauth-client-secret

Note: The GOOGLE_SERVICE_ACCOUNT_JSON should contain the entire JSON content from the downloaded service account key file.

How It Works

1. Gmail API Push Notifications

The system uses Gmail's push notification feature to receive real-time updates when new emails arrive in a user's inbox.

Watch Setup Process

def start_gmail_watch(user: User):
"""Set up Gmail push notifications for a user"""
gmail_client = get_user_gmail_client(user)

watch_request = {
"labelIds": ["INBOX"],
"topicName": settings.GMAIL_PUBSUB_TOPIC,
}

result = gmail_client.users().watch(userId="me", body=watch_request).execute()
return result["historyId"], expires_at

Key Points:

  • Monitors only the INBOX label for new messages
  • Sends notifications to the configured Pub/Sub topic
  • Returns a historyId for tracking email changes
  • Watch expires after 7 days and needs renewal

Watch Renewal

The system automatically renews Gmail watches before they expire:

  • Checks every 12 hours for watches expiring within 24 hours
  • Renews watches for users with enabled assistants
  • Prevents gaps in email monitoring

2. Pub/Sub Message Consumption

The system consumes Gmail notifications from the Pub/Sub subscription using a dedicated consumer.

Consumer Implementation

class GmailNotificationsConsumer:
def __init__(self):
service_account_json = json.loads(settings.GOOGLE_SERVICE_ACCOUNT_JSON)
credentials = service_account.Credentials.from_service_account_info(
service_account_json
)
self.subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
self.subscription_path = self.subscriber.subscription_path(
settings.GOOGLE_CLOUD_PROJECT_ID,
settings.GMAIL_PUBSUB_SUBSCRIPTION,
)

Message Processing

def beat_consume_gmail_notifications():
"""Consumes the gmail notifications from the pubsub topic"""
messages = consumer.consume_messages()

for message in messages:
email = message.get("emailAddress")
history_id = str(message.get("historyId"))

# Find users associated with this email
user_ids = GoogleOAuthToken.objects.filter(email=email).values_list("user_id", flat=True)

for user in users_associated_with_email:
if is_assistant_enabled_for_user(user):
# Retrieve new emails between history IDs
emails = retrieve_emails_between_history(user, assistant.history_id, history_id)

for email in emails:
if not should_ignore_message_based_on_labels(email):
start_assistant_draft_workflow(user, email)

Key Features:

  • Pulls messages from Pub/Sub subscription
  • Handles multiple users per email address
  • Tracks history IDs to avoid duplicate processing
  • Filters out emails that don't need responses (DRAFT, SENT, SPAM)

3. Temporal Workflow Execution

When a new email is detected, the system starts a Temporal workflow to handle the AI processing and draft creation.

Workflow Definition

@workflow.defn(name="AutoDrafterWorkflow")
class AutoDrafterWorkflow(BaseWorkflow):
@workflow.run
async def run(self, user_id: int, message: dict) -> None:
# Generate AI draft
generated_draft = await workflow.execute_activity(
generate_draft,
args=[user_id, message],
start_to_close_timeout=datetime.timedelta(seconds=600),
retry_policy=RetryPolicy(
initial_interval=datetime.timedelta(minutes=1),
maximum_interval=datetime.timedelta(minutes=5),
backoff_coefficient=1.15,
maximum_attempts=5,
),
)

if generated_draft:
# Publish draft to Gmail
await workflow.execute_activity(
publish_draft,
args=[user_id, message, generated_draft],
start_to_close_timeout=datetime.timedelta(seconds=60),
)

Workflow Features:

  • Reliable execution with retry policies
  • Timeout handling (10 minutes for AI generation, 1 minute for draft creation)
  • Error handling and logging
  • Unique workflow IDs per user and message

4. AI Response Generation

The system uses AI models (Claude or OpenAI) to generate contextual email responses.

AI Client Integration

def generate_email_response(user: User, email_sender: str, email_subject: str, email_content: str) -> str:
client = get_client(user.account, model=LATEST_CLAUDE_MODEL, user=user)
input = f"Email sender: {email_sender}\nEmail subject: {email_subject}\nEmail content: {email_content}"
response = client.generate_response_for_email(input)
return response

AI Prompt System

The system uses specialized prompts for email response generation:

Email Responder Prompt Features:

  • Determines if an email needs a response
  • Generates professional, human-like responses
  • Incorporates business context and memories
  • Returns structured JSON with decision and response

Prompt Structure:

{
"should_respond": true/false,
"response": "email response content or empty string"
}

Response Evaluation Criteria:

  • Subject line analysis (newsletters, notifications vs. direct communication)
  • Sender analysis (automated vs. human senders)
  • Content analysis (questions, requests, invitations for discussion)
  • Contextual relevance to business needs

5. Draft Creation and Label Management

Once the AI generates a response, the system creates a Gmail draft and applies appropriate labels.

Draft Creation

def create_draft_reply(user: User, message_id: str, html: str):
"""Create a draft reply to a specific message"""
gmail_client = get_user_gmail_client(user)

# Get original message details
original_message = gmail_client.users().messages().get(
userId="me", id=message_id, format="full"
).execute()

# Create reply message
reply_message = create_message(
user=user,
to_email_addresses=[extract_sender_email(original_message)],
subject=f"Re: {extract_subject(original_message)}",
html=html,
in_reply_to=original_message["id"],
references=original_message.get("threadId"),
)

# Create draft
draft = gmail_client.users().drafts().create(
userId="me",
body={"message": {"raw": base64.urlsafe_b64encode(reply_message.as_bytes()).decode()}}
).execute()

return draft

Label Management

def add_ghost_label_to_message(user: User, message_id: str):
"""Add 'Ghost' label to indicate AI-generated draft"""
add_labels_to_message(user, message_id, ["Ghost"])

def add_labels_to_message(user: User, message_id: str, labels: list[str]):
"""Add labels to a message, creating them if they don't exist"""
gmail_client = get_user_gmail_client(user)
labels_available = get_available_labels(user)

# Create missing labels
labels_to_create = [label for label in labels if label not in labels_available.keys()]
for label in labels_to_create:
gmail_client.users().labels().create(
userId="me", body={"name": label}
).execute()

# Add labels to message
labels_to_add = [labels_available[label] for label in labels if label in labels_available.keys()]
gmail_client.users().messages().modify(
userId="me", id=message_id, body={"addLabelIds": labels_to_add}
).execute()

Label Features:

  • Automatically creates "Ghost" label for AI-generated drafts
  • Creates labels on-demand if they don't exist
  • Applies labels to both original messages and drafts
  • Helps users identify AI-generated content

Data Flow

  1. Email Arrives: New email arrives in user's Gmail inbox
  2. Push Notification: Gmail sends push notification to Pub/Sub topic
  3. Message Consumption: System pulls notification from Pub/Sub subscription
  4. User Lookup: System finds all users associated with the email address
  5. Email Retrieval: System fetches new emails using Gmail History API
  6. Filtering: System filters out emails that don't need responses
  7. Workflow Trigger: System starts Temporal workflow for each qualifying email
  8. AI Processing: AI analyzes email and generates response
  9. Draft Creation: System creates Gmail draft with AI-generated response
  10. Label Application: System applies "Ghost" label to identify AI-generated content

Error Handling

The system includes comprehensive error handling at multiple levels:

  • Gmail API Errors: Retry logic for API rate limits and temporary failures
  • Pub/Sub Errors: Message acknowledgment and dead letter handling
  • Temporal Workflow Errors: Retry policies with exponential backoff
  • AI Generation Errors: Fallback handling for AI service failures
  • Database Errors: Transaction rollback and logging

Monitoring and Logging

The system provides detailed logging for:

  • Email notification processing
  • AI response generation
  • Draft creation success/failure
  • Error conditions and retry attempts
  • Performance metrics

Security Considerations

  • OAuth Tokens: User credentials stored securely with proper scopes
  • Service Account: Minimal required permissions for Pub/Sub access
  • Data Privacy: Email content processed in-memory, not stored permanently
  • Access Control: Feature flag controls for enabling/disabling per user

Configuration Options

  • Feature Flags: Enable/disable assistant per user
  • AI Models: Configurable AI model selection (Claude, OpenAI)
  • Retry Policies: Customizable retry intervals and attempts
  • Watch Renewal: Configurable renewal frequency
  • Label Management: Customizable label names and behavior

Troubleshooting

Common Issues

  1. Watch Expiration: Ensure watch renewal task is running
  2. Pub/Sub Permissions: Verify service account has correct roles
  3. OAuth Scopes: Ensure Gmail API scopes include watch permissions
  4. AI Rate Limits: Monitor AI service usage and implement backoff
  5. Temporal Worker: Ensure Temporal worker is running and healthy

Debug Steps

  1. Check Gmail watch status in user's assistant record
  2. Verify Pub/Sub subscription is active and receiving messages
  3. Review Temporal workflow execution history
  4. Check AI service logs for generation failures
  5. Verify Gmail API quota and rate limits