Email Drafter Feature
Overview
The Email Drafter feature is an AI-powered system that automatically monitors Gmail accounts for incoming emails and generates draft responses when appropriate. The system uses Google Cloud Pub/Sub for real-time email notifications, Temporal workflows for reliable processing, and AI models to generate contextual email responses.
Architecture
Setup
1. Google Cloud Platform Configuration
Enable Pub/Sub API
- Go to Google Cloud Console
- Navigate to APIs & Services → Library
- Search for "Pub/Sub API" and enable it
Create Pub/Sub Topic
- Navigate to Pub/Sub → Topics
- Click Create Topic
- Set topic ID:
gmail-notifications - Important: Uncheck "Add default subscription"
- Click Create
Configure Topic Permissions
- Go to the newly created
gmail-notificationstopic - Navigate to the Permissions tab
- Click + Add Principal
- Add principal:
gmail-api-push@system.gserviceaccount.com - Assign role: Pub/Sub Publisher
- Click Save
Create Subscription
- Go to Subscriptions in the Pub/Sub console
- Click Create Subscription
- Set subscription ID:
gmail-subscription - Select topic:
gmail-notifications - Configure settings:
- Delivery type: Pull
- Message retention: 7 days
- Expiration period: Never expire
- Ack deadline: 300s
- ✅ Enable exactly once delivery
- ✅ Enable message ordering
- Click Create
2. Service Account Configuration
Create Service Account
- Go to IAM & Admin → Service Accounts
- Select your project
- Click Create Service Account
- Grant the following roles:
- Pub/Sub Subscriber (to pull messages)
- Pub/Sub Editor (to manage subscriptions)
Generate Service Account Key
- Click on the created service account
- Go to Keys tab
- Click Add Key → Create new key → JSON
- Download the JSON file (e.g.,
google-credentials.json)
3. Environment Variables
Add the following environment variables to your .env file:
# Google Cloud Configuration
GOOGLE_CLOUD_PROJECT_ID=your-project-id
GOOGLE_SERVICE_ACCOUNT_JSON={"type":"service_account","project_id":"..."}
GMAIL_PUBSUB_TOPIC=gmail-notifications
GMAIL_PUBSUB_SUBSCRIPTION=gmail-subscription
# Google OAuth (for user authentication)
GOOGLE_OAUTH_CLIENT_ID=your-oauth-client-id
GOOGLE_OAUTH_CLIENT_SECRET=your-oauth-client-secret
Note: The GOOGLE_SERVICE_ACCOUNT_JSON should contain the entire JSON content from the downloaded service account key file.
How It Works
1. Gmail API Push Notifications
The system uses Gmail's push notification feature to receive real-time updates when new emails arrive in a user's inbox.
Watch Setup Process
def start_gmail_watch(user: User):
"""Set up Gmail push notifications for a user"""
gmail_client = get_user_gmail_client(user)
watch_request = {
"labelIds": ["INBOX"],
"topicName": settings.GMAIL_PUBSUB_TOPIC,
}
result = gmail_client.users().watch(userId="me", body=watch_request).execute()
return result["historyId"], expires_at
Key Points:
- Monitors only the
INBOXlabel for new messages - Sends notifications to the configured Pub/Sub topic
- Returns a
historyIdfor tracking email changes - Watch expires after 7 days and needs renewal
Watch Renewal
The system automatically renews Gmail watches before they expire:
- Checks every 12 hours for watches expiring within 24 hours
- Renews watches for users with enabled assistants
- Prevents gaps in email monitoring
2. Pub/Sub Message Consumption
The system consumes Gmail notifications from the Pub/Sub subscription using a dedicated consumer.
Consumer Implementation
class GmailNotificationsConsumer:
def __init__(self):
service_account_json = json.loads(settings.GOOGLE_SERVICE_ACCOUNT_JSON)
credentials = service_account.Credentials.from_service_account_info(
service_account_json
)
self.subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
self.subscription_path = self.subscriber.subscription_path(
settings.GOOGLE_CLOUD_PROJECT_ID,
settings.GMAIL_PUBSUB_SUBSCRIPTION,
)
Message Processing
def beat_consume_gmail_notifications():
"""Consumes the gmail notifications from the pubsub topic"""
messages = consumer.consume_messages()
for message in messages:
email = message.get("emailAddress")
history_id = str(message.get("historyId"))
# Find users associated with this email
user_ids = GoogleOAuthToken.objects.filter(email=email).values_list("user_id", flat=True)
for user in users_associated_with_email:
if is_assistant_enabled_for_user(user):
# Retrieve new emails between history IDs
emails = retrieve_emails_between_history(user, assistant.history_id, history_id)
for email in emails:
if not should_ignore_message_based_on_labels(email):
start_assistant_draft_workflow(user, email)
Key Features:
- Pulls messages from Pub/Sub subscription
- Handles multiple users per email address
- Tracks history IDs to avoid duplicate processing
- Filters out emails that don't need responses (DRAFT, SENT, SPAM)
3. Temporal Workflow Execution
When a new email is detected, the system starts a Temporal workflow to handle the AI processing and draft creation.
Workflow Definition
@workflow.defn(name="AutoDrafterWorkflow")
class AutoDrafterWorkflow(BaseWorkflow):
@workflow.run
async def run(self, user_id: int, message: dict) -> None:
# Generate AI draft
generated_draft = await workflow.execute_activity(
generate_draft,
args=[user_id, message],
start_to_close_timeout=datetime.timedelta(seconds=600),
retry_policy=RetryPolicy(
initial_interval=datetime.timedelta(minutes=1),
maximum_interval=datetime.timedelta(minutes=5),
backoff_coefficient=1.15,
maximum_attempts=5,
),
)
if generated_draft:
# Publish draft to Gmail
await workflow.execute_activity(
publish_draft,
args=[user_id, message, generated_draft],
start_to_close_timeout=datetime.timedelta(seconds=60),
)
Workflow Features:
- Reliable execution with retry policies
- Timeout handling (10 minutes for AI generation, 1 minute for draft creation)
- Error handling and logging
- Unique workflow IDs per user and message
4. AI Response Generation
The system uses AI models (Claude or OpenAI) to generate contextual email responses.
AI Client Integration
def generate_email_response(user: User, email_sender: str, email_subject: str, email_content: str) -> str:
client = get_client(user.account, model=LATEST_CLAUDE_MODEL, user=user)
input = f"Email sender: {email_sender}\nEmail subject: {email_subject}\nEmail content: {email_content}"
response = client.generate_response_for_email(input)
return response
AI Prompt System
The system uses specialized prompts for email response generation:
Email Responder Prompt Features:
- Determines if an email needs a response
- Generates professional, human-like responses
- Incorporates business context and memories
- Returns structured JSON with decision and response
Prompt Structure:
{
"should_respond": true/false,
"response": "email response content or empty string"
}
Response Evaluation Criteria:
- Subject line analysis (newsletters, notifications vs. direct communication)
- Sender analysis (automated vs. human senders)
- Content analysis (questions, requests, invitations for discussion)
- Contextual relevance to business needs
5. Draft Creation and Label Management
Once the AI generates a response, the system creates a Gmail draft and applies appropriate labels.
Draft Creation
def create_draft_reply(user: User, message_id: str, html: str):
"""Create a draft reply to a specific message"""
gmail_client = get_user_gmail_client(user)
# Get original message details
original_message = gmail_client.users().messages().get(
userId="me", id=message_id, format="full"
).execute()
# Create reply message
reply_message = create_message(
user=user,
to_email_addresses=[extract_sender_email(original_message)],
subject=f"Re: {extract_subject(original_message)}",
html=html,
in_reply_to=original_message["id"],
references=original_message.get("threadId"),
)
# Create draft
draft = gmail_client.users().drafts().create(
userId="me",
body={"message": {"raw": base64.urlsafe_b64encode(reply_message.as_bytes()).decode()}}
).execute()
return draft
Label Management
def add_ghost_label_to_message(user: User, message_id: str):
"""Add 'Ghost' label to indicate AI-generated draft"""
add_labels_to_message(user, message_id, ["Ghost"])
def add_labels_to_message(user: User, message_id: str, labels: list[str]):
"""Add labels to a message, creating them if they don't exist"""
gmail_client = get_user_gmail_client(user)
labels_available = get_available_labels(user)
# Create missing labels
labels_to_create = [label for label in labels if label not in labels_available.keys()]
for label in labels_to_create:
gmail_client.users().labels().create(
userId="me", body={"name": label}
).execute()
# Add labels to message
labels_to_add = [labels_available[label] for label in labels if label in labels_available.keys()]
gmail_client.users().messages().modify(
userId="me", id=message_id, body={"addLabelIds": labels_to_add}
).execute()
Label Features:
- Automatically creates "Ghost" label for AI-generated drafts
- Creates labels on-demand if they don't exist
- Applies labels to both original messages and drafts
- Helps users identify AI-generated content
Data Flow
- Email Arrives: New email arrives in user's Gmail inbox
- Push Notification: Gmail sends push notification to Pub/Sub topic
- Message Consumption: System pulls notification from Pub/Sub subscription
- User Lookup: System finds all users associated with the email address
- Email Retrieval: System fetches new emails using Gmail History API
- Filtering: System filters out emails that don't need responses
- Workflow Trigger: System starts Temporal workflow for each qualifying email
- AI Processing: AI analyzes email and generates response
- Draft Creation: System creates Gmail draft with AI-generated response
- Label Application: System applies "Ghost" label to identify AI-generated content
Error Handling
The system includes comprehensive error handling at multiple levels:
- Gmail API Errors: Retry logic for API rate limits and temporary failures
- Pub/Sub Errors: Message acknowledgment and dead letter handling
- Temporal Workflow Errors: Retry policies with exponential backoff
- AI Generation Errors: Fallback handling for AI service failures
- Database Errors: Transaction rollback and logging
Monitoring and Logging
The system provides detailed logging for:
- Email notification processing
- AI response generation
- Draft creation success/failure
- Error conditions and retry attempts
- Performance metrics
Security Considerations
- OAuth Tokens: User credentials stored securely with proper scopes
- Service Account: Minimal required permissions for Pub/Sub access
- Data Privacy: Email content processed in-memory, not stored permanently
- Access Control: Feature flag controls for enabling/disabling per user
Configuration Options
- Feature Flags: Enable/disable assistant per user
- AI Models: Configurable AI model selection (Claude, OpenAI)
- Retry Policies: Customizable retry intervals and attempts
- Watch Renewal: Configurable renewal frequency
- Label Management: Customizable label names and behavior
Troubleshooting
Common Issues
- Watch Expiration: Ensure watch renewal task is running
- Pub/Sub Permissions: Verify service account has correct roles
- OAuth Scopes: Ensure Gmail API scopes include watch permissions
- AI Rate Limits: Monitor AI service usage and implement backoff
- Temporal Worker: Ensure Temporal worker is running and healthy
Debug Steps
- Check Gmail watch status in user's assistant record
- Verify Pub/Sub subscription is active and receiving messages
- Review Temporal workflow execution history
- Check AI service logs for generation failures
- Verify Gmail API quota and rate limits