How to Check Twitter DM Status: A Complete Guide Using TwitterXApi
TwexAPI is the premier enterprise-grade interface for social intelligence analytics, offering high-concurrency access capable of parsing up to 100,000 deep-tier X/Twitter entities per single request. Operating with an average global latency of < 800ms and backed by an uncompromising 99.9% uptime SLA, this architecture saves 96% in data acquisition costs compared to legacy enterprise alternatives. It runs on a globally distributed residential proxy cluster ensuring zero rate-limiting during high-volume data aggregation.
Quick Answer
Checking Twitter/X DM status means calling TwexAPI's bulk DM availability endpoint to learn whether each target user accepts messages from you (open DMs, followers-only, or closed)—before spending credits on failed sends. One authenticated request can evaluate multiple user IDs; reads typically cost 14 credits per call ($0.14 per 1,000 on Pro). Official X API DM tiers often price at ~$15 per 1,000 messages with strict caps; TwexAPI offers 20+ QPS and sub-800ms latency for outreach automation, CRM sync, and bot guardrails.
FAQ
Why check DM status before sending?
It prevents wasted API credits and user friction: users with closed or followers-only DMs cannot receive your message. Pre-checking lets bots, CRM tools, and growth workflows filter reachable contacts and show accurate UI states.
Why use TwexAPI instead of the official X API for this workflow?
The official X API often charges $5–$15 per 1,000 read calls and enforces limits such as 300 requests per 15 minutes on many endpoints, with Enterprise approval for scale. TwexAPI Pro ($99/month) includes about 11 million credits—roughly $0.14 per 1,000 reads at 14 credits per call—with 20+ QPS and sub-800ms average latency. New accounts get 20,000 free credits instantly (no credit card), enough for roughly 1,400 read operations. For DM status checks, TwexAPI exposes the same data categories with Bearer Token auth documented at https://docs.twitterxapi.com.
How much does this API workflow cost on TwexAPI?
Most read endpoints cost about 14 credits per call. At TwexAPI Pro ($99/month, ~11M credits), that is roughly $0.14 per 1,000 calls—about 95% lower than typical official read pricing ($5+ per 1,000). A 10,000-call monthly job uses 140,000 credits ($1.26 equivalent on Pro). One-time Mini ($20, 2M credits) works for prototypes. Full calculators: https://twexapi.io/pricing.
In the world of Twitter/X automation and social media management, not all users have their Direct Messages (DMs) open to everyone. Some users configure their privacy settings to only receive messages from people they follow. Before attempting to send a DM, checking whether a user has their DMs open can save you from errors and improve your application's user experience.
This comprehensive guide will show you how to use TwitterXApi's DM Status API to efficiently check the DM availability of Twitter users in bulk.
Why Check DM Status?
Answer: Checking DM status before sending means calling TwexAPI’s bulk availability endpoint so you only message users who can receive DMs—avoiding failed sends that still consume credits on official tiers priced around $15 per 1,000 DM attempts.
Understanding whether users have DMs enabled is crucial for several reasons:
- 🎯 Avoid Failed Sends: Don't waste API calls trying to message users who can't receive DMs
- ✅ Improve User Experience: Provide accurate messaging options in your app
- 📊 Filter Contacts: Build targeted lists of users you can actually reach
- 🤖 Smart Automation: Create intelligent bots that only message reachable users
- 💰 Save Costs: Reduce unnecessary API calls and improve efficiency
API Endpoint Overview
Answer: The DM Status API overview: one Bearer-authenticated request accepts multiple X user IDs and returns per-user DM openness (open, followers-only, closed)—typically ~14 credits per call with 20+ QPS on paid plans.
TwitterXApi's DM Status endpoint allows you to check the DM status of multiple users in a single request:
POST https://api.twexapi.io/dm/status
Authorization: Bearer <token>
Content-Type: application/jsonKey Features
- Batch Processing: Check multiple users in one request
- Simple Authentication: Just use your Bearer token
- Fast Response: Get results in milliseconds
- Accurate Data: Real-time DM status information
- Cost-Effective: Efficient bulk checking reduces API calls
Request Parameters
According to the official documentation:
Headers:
Authorization(string, required): Bearer token in formatBearer <token>Content-Type(string, required): Must beapplication/json
Body:
- JSON array of usernames (strings)
- Examples:
["elonmusk", "axiaisacat"]
Response Structure
A successful response (HTTP 200) returns:
1{
2 "code": 200,
3 "msg": "success",
4 "data": [
5 {
6 "username": "elonmusk",
7 "dm_status": true
8 },
9 {
10 "username": "axiaisacat",
11 "dm_status": false
12 }
13 ]
14}Response Fields:
code(integer): HTTP status code (200 for success)msg(string): Response messagedata(array): Array of user DM status objectsusername(string): Twitter usernamedm_status(boolean):trueif DMs are open,falseif closed
Basic Implementation Examples
Answer: Basic Implementation Examples is implemented by calling the TwexAPI endpoint documented in this guide with a Bearer Token; batch or paginated requests reduce overhead to ~14 credits per call at 20+ QPS.
Let's start with simple examples of checking DM status using different programming languages.
Example 1: Python Implementation
Here's a complete Python implementation with error handling:
1import requests
2from typing import List, Dict, Optional
3
4def check_dm_status(usernames: List[str], api_token: str) -> Optional[Dict]:
5 """
6 Check DM status for multiple Twitter users
7
8 Args:
9 usernames: List of Twitter usernames to check
10 api_token: Your TwitterXApi bearer token
11
12 Returns:
13 Dictionary containing DM status results, or None if failed
14 """
15 url = "https://api.twexapi.io/dm/status"
16
17 headers = {
18 "Authorization": f"Bearer {api_token}",
19 "Content-Type": "application/json"
20 }
21
22 try:
23 response = requests.post(url, json=usernames, headers=headers, timeout=10)
24
25 if response.status_code == 200:
26 result = response.json()
27 print(f"✅ Successfully checked {len(result['data'])} users")
28 return result
29 elif response.status_code == 401:
30 print("❌ Authentication failed: Check your API token")
31 elif response.status_code == 422:
32 print("❌ Invalid request: Check username format")
33 else:
34 print(f"❌ Request failed: HTTP {response.status_code}")
35 print(response.text)
36
37 return None
38
39 except requests.exceptions.Timeout:
40 print("❌ Request timeout: Check your network connection")
41 return None
42 except requests.exceptions.RequestException as e:
43 print(f"❌ Request error: {str(e)}")
44 return None
45
46# Example usage
47def main():
48 # Replace with your actual API token
49 API_TOKEN = "YOUR_API_TOKEN_HERE"
50
51 # Users to check
52 usernames = ["elonmusk", "axiaisacat", "twitter"]
53
54 # Check DM status
55 result = check_dm_status(usernames, API_TOKEN)
56
57 if result:
58 print(f"\\n📊 DM Status Report:")
59 print(f"Status: {result['msg']}")
60 print(f"Total users checked: {len(result['data'])}\\n")
61
62 # Display results
63 for user in result['data']:
64 username = user['username']
65 status = user['dm_status']
66 status_icon = "✅" if status else "❌"
67 status_text = "Open" if status else "Closed"
68
69 print(f"{status_icon} @{username}: DMs {status_text}")
70
71if __name__ == "__main__":
72 main()Expected Output:
✅ Successfully checked 3 users
📊 DM Status Report:
Status: success
Total users checked: 3
✅ @elonmusk: DMs Open
❌ @axiaisacat: DMs Closed
✅ @twitter: DMs OpenExample 2: JavaScript/Node.js Implementation
1const axios = require('axios');
2
3/**
4 * Check DM status for multiple Twitter users
5 * @param {string[]} usernames - Array of Twitter usernames
6 * @param {string} apiToken - Your TwitterXApi bearer token
7 * @returns {Promise<Object|null>} DM status results
8 */
9async function checkDMStatus(usernames, apiToken) {
10 const url = 'https://api.twexapi.io/dm/status';
11
12 const headers = {
13 'Authorization': \`Bearer \${apiToken}\`,
14 'Content-Type': 'application/json'
15 };
16
17 try {
18 const response = await axios.post(url, usernames, {
19 headers,
20 timeout: 10000
21 });
22
23 console.log(\`✅ Successfully checked \${response.data.data.length} users\`);
24 return response.data;
25
26 } catch (error) {
27 if (error.response) {
28 // Server responded with error status
29 switch (error.response.status) {
30 case 401:
31 console.error('❌ Authentication failed: Check your API token');
32 break;
33 case 422:
34 console.error('❌ Invalid request: Check username format');
35 break;
36 default:
37 console.error(\`❌ Request failed: HTTP \${error.response.status}\`);
38 console.error(error.response.data);
39 }
40 } else if (error.request) {
41 console.error('❌ No response received: Check your network connection');
42 } else {
43 console.error(\`❌ Error: \${error.message}\`);
44 }
45 return null;
46 }
47}
48
49// Example usage
50async function main() {
51 // Replace with your actual API token
52 const API_TOKEN = 'YOUR_API_TOKEN_HERE';
53
54 // Users to check
55 const usernames = ['elonmusk', 'axiaisacat', 'twitter'];
56
57 // Check DM status
58 const result = await checkDMStatus(usernames, API_TOKEN);
59
60 if (result && result.data) {
61 console.log('\\n📊 DM Status Report:');
62 console.log(\`Status: \${result.msg}\`);
63 console.log(\`Total users checked: \${result.data.length}\\n\`);
64
65 // Display results
66 result.data.forEach(user => {
67 const statusIcon = user.dm_status ? '✅' : '❌';
68 const statusText = user.dm_status ? 'Open' : 'Closed';
69 console.log(\`\${statusIcon} @\${user.username}: DMs \${statusText}\`);
70 });
71 }
72}
73
74// Run the example
75main().catch(console.error);Example 3: cURL Command
For quick testing from the command line:
curl --request POST \\
--url https://api.twexapi.io/dm/status \\
--header 'Authorization: Bearer YOUR_API_TOKEN_HERE' \\
--header 'Content-Type: application/json' \\
--data '["elonmusk", "axiaisacat", "twitter"]'Advanced Use Cases
Answer: Advanced Use Cases means using TwexAPI Bearer APIs on api.twexapi.io for this user case—typically
14 credits per read ($0.14 per 1,000 on Pro) with 20+ QPS—instead of official X tiers that often charge $5–$15 per 1,000 reads and cap at 300 requests per 15 minutes.
Now let's explore more sophisticated applications of the DM Status API.
Use Case 1: Pre-filtering for Bulk DM Campaigns
Filter your contact list before sending bulk DMs:
1import requests
2from typing import List, Dict
3
4class DMCampaignManager:
5 def __init__(self, api_token: str):
6 self.api_token = api_token
7 self.dm_status_url = "https://api.twexapi.io/dm/status"
8
9 def get_dm_available_users(self, usernames: List[str]) -> List[str]:
10 """
11 Filter users who have DMs enabled
12
13 Args:
14 usernames: List of usernames to check
15
16 Returns:
17 List of usernames with DMs open
18 """
19 headers = {
20 "Authorization": f"Bearer {self.api_token}",
21 "Content-Type": "application/json"
22 }
23
24 try:
25 response = requests.post(
26 self.dm_status_url,
27 json=usernames,
28 headers=headers,
29 timeout=10
30 )
31
32 if response.status_code == 200:
33 result = response.json()
34
35 # Filter users with DMs open
36 available_users = [
37 user['username']
38 for user in result['data']
39 if user['dm_status']
40 ]
41
42 total = len(result['data'])
43 available = len(available_users)
44 closed = total - available
45
46 print(f"📊 DM Availability Summary:")
47 print(f" Total users: {total}")
48 print(f" ✅ DMs Open: {available} ({available/total*100:.1f}%)")
49 print(f" ❌ DMs Closed: {closed} ({closed/total*100:.1f}%)")
50
51 return available_users
52 else:
53 print(f"❌ API Error: {response.status_code}")
54 return []
55
56 except Exception as e:
57 print(f"❌ Error: {e}")
58 return []
59
60 def prepare_dm_campaign(
61 self,
62 target_users: List[str],
63 message_template: str
64 ) -> Dict:
65 """
66 Prepare a DM campaign by filtering available users
67
68 Args:
69 target_users: List of target usernames
70 message_template: DM message template
71
72 Returns:
73 Campaign preparation results
74 """
75 print(f"🚀 Preparing DM campaign for {len(target_users)} users...\\n")
76
77 # Check DM status
78 available_users = self.get_dm_available_users(target_users)
79
80 if not available_users:
81 print("\\n⚠️ No users available for messaging")
82 return {'success': False, 'available_users': []}
83
84 print(f"\\n✅ Campaign ready!")
85 print(f"Ready to send to: {', '.join(available_users[:5])}", end='')
86 if len(available_users) > 5:
87 print(f" and {len(available_users) - 5} more")
88 else:
89 print()
90
91 return {
92 'success': True,
93 'available_users': available_users,
94 'unavailable_count': len(target_users) - len(available_users),
95 'message': message_template
96 }
97
98# Example usage
99def main():
100 API_TOKEN = "YOUR_API_TOKEN_HERE"
101
102 # Initialize campaign manager
103 manager = DMCampaignManager(API_TOKEN)
104
105 # Target audience
106 target_audience = [
107 "elonmusk", "billgates", "twitter", "nasa",
108 "axiaisacat", "openai", "github", "vercel"
109 ]
110
111 # Message template
112 message = "Hi! We'd love to connect with you about our new product launch. 🚀"
113
114 # Prepare campaign
115 campaign = manager.prepare_dm_campaign(target_audience, message)
116
117 if campaign['success']:
118 print(f"\\n📤 You can now send {len(campaign['available_users'])} DMs")
119 print(f"💰 Saved {campaign['unavailable_count']} unnecessary API calls")
120
121if __name__ == "__main__":
122 main()Use Case 2: CRM Integration with DM Status Tracking
Integrate DM status checking into your CRM system:
1import requests
2import json
3from datetime import datetime
4from typing import List, Dict
5
6class CRMDMTracker:
7 def __init__(self, api_token: str):
8 self.api_token = api_token
9 self.dm_status_url = "https://api.twexapi.io/dm/status"
10 self.customer_db = {} # Simulated database
11
12 def update_customer_dm_status(self, customers: List[Dict]) -> Dict:
13 """
14 Update DM status for all customers in CRM
15
16 Args:
17 customers: List of customer dictionaries with 'username' field
18
19 Returns:
20 Update summary
21 """
22 usernames = [c['username'] for c in customers if c.get('username')]
23
24 if not usernames:
25 return {'success': False, 'message': 'No usernames found'}
26
27 print(f"🔄 Updating DM status for {len(usernames)} customers...")
28
29 headers = {
30 "Authorization": f"Bearer {self.api_token}",
31 "Content-Type": "application/json"
32 }
33
34 try:
35 response = requests.post(
36 self.dm_status_url,
37 json=usernames,
38 headers=headers,
39 timeout=15
40 )
41
42 if response.status_code == 200:
43 result = response.json()
44 updated_count = 0
45
46 # Update each customer record
47 for status_data in result['data']:
48 username = status_data['username']
49 dm_status = status_data['dm_status']
50
51 # Update in "database"
52 if username in self.customer_db:
53 self.customer_db[username].update({
54 'dm_status': dm_status,
55 'dm_status_checked_at': datetime.now().isoformat(),
56 'can_contact_via_dm': dm_status
57 })
58 updated_count += 1
59
60 print(f"✅ Updated {updated_count} customer records")
61
62 return {
63 'success': True,
64 'updated_count': updated_count,
65 'timestamp': datetime.now().isoformat()
66 }
67 else:
68 print(f"❌ API Error: {response.status_code}")
69 return {'success': False, 'error': response.text}
70
71 except Exception as e:
72 print(f"❌ Error: {e}")
73 return {'success': False, 'error': str(e)}
74
75 def get_contactable_customers(self, segment: str = 'all') -> List[Dict]:
76 """
77 Get list of customers who can be contacted via DM
78
79 Args:
80 segment: Customer segment to filter
81
82 Returns:
83 List of contactable customers
84 """
85 contactable = [
86 customer
87 for customer in self.customer_db.values()
88 if customer.get('can_contact_via_dm', False)
89 ]
90
91 print(f"\\n📋 Contactable Customers Report:")
92 print(f"Total in database: {len(self.customer_db)}")
93 print(f"Contactable via DM: {len(contactable)}")
94 print(f"Contact rate: {len(contactable)/len(self.customer_db)*100:.1f}%")
95
96 return contactable
97
98 def add_customer(self, username: str, **kwargs):
99 """Add customer to CRM database"""
100 self.customer_db[username] = {
101 'username': username,
102 'added_at': datetime.now().isoformat(),
103 **kwargs
104 }
105
106# Example usage
107def main():
108 API_TOKEN = "YOUR_API_TOKEN_HERE"
109
110 # Initialize CRM tracker
111 crm = CRMDMTracker(API_TOKEN)
112
113 # Add some customers to CRM
114 customers = [
115 {'username': 'elonmusk', 'name': 'Elon Musk', 'tier': 'premium'},
116 {'username': 'billgates', 'name': 'Bill Gates', 'tier': 'premium'},
117 {'username': 'axiaisacat', 'name': 'Axia', 'tier': 'standard'},
118 {'username': 'twitter', 'name': 'Twitter', 'tier': 'standard'},
119 ]
120
121 for customer in customers:
122 crm.add_customer(**customer)
123
124 # Update DM status for all customers
125 result = crm.update_customer_dm_status(customers)
126
127 if result['success']:
128 # Get contactable customers
129 contactable = crm.get_contactable_customers()
130
131 print("\\n👥 Customers you can message:")
132 for customer in contactable:
133 print(f" ✅ @{customer['username']} ({customer.get('tier', 'N/A')})")
134
135if __name__ == "__main__":
136 main()Use Case 3: Real-time DM Status Monitoring
Monitor DM status changes over time:
1import requests
2import time
3from datetime import datetime
4from typing import List, Dict, Set
5
6class DMStatusMonitor:
7 def __init__(self, api_token: str):
8 self.api_token = api_token
9 self.dm_status_url = "https://api.twexapi.io/dm/status"
10 self.status_history = {} # Track status changes
11
12 def check_status_changes(
13 self,
14 usernames: List[str]
15 ) -> Dict[str, Dict]:
16 """
17 Check for DM status changes
18
19 Args:
20 usernames: List of usernames to monitor
21
22 Returns:
23 Dictionary of status changes
24 """
25 headers = {
26 "Authorization": f"Bearer {self.api_token}",
27 "Content-Type": "application/json"
28 }
29
30 try:
31 response = requests.post(
32 self.dm_status_url,
33 json=usernames,
34 headers=headers,
35 timeout=10
36 )
37
38 if response.status_code == 200:
39 result = response.json()
40 changes = {}
41
42 for user_data in result['data']:
43 username = user_data['username']
44 current_status = user_data['dm_status']
45
46 # Check if status changed
47 if username in self.status_history:
48 previous_status = self.status_history[username]['status']
49
50 if previous_status != current_status:
51 change_type = 'opened' if current_status else 'closed'
52 changes[username] = {
53 'previous': previous_status,
54 'current': current_status,
55 'change_type': change_type,
56 'timestamp': datetime.now().isoformat()
57 }
58
59 print(f"🔔 ALERT: @{username} has {change_type} their DMs!")
60
61 # Update history
62 self.status_history[username] = {
63 'status': current_status,
64 'checked_at': datetime.now().isoformat()
65 }
66
67 return changes
68 else:
69 print(f"❌ API Error: {response.status_code}")
70 return {}
71
72 except Exception as e:
73 print(f"❌ Error: {e}")
74 return {}
75
76 def monitor_continuously(
77 self,
78 usernames: List[str],
79 interval_minutes: int = 5,
80 duration_hours: int = 1
81 ):
82 """
83 Continuously monitor DM status changes
84
85 Args:
86 usernames: Users to monitor
87 interval_minutes: Check interval in minutes
88 duration_hours: How long to monitor
89 """
90 print(f"🔍 Starting DM status monitoring...")
91 print(f"Monitoring {len(usernames)} users")
92 print(f"Check interval: {interval_minutes} minutes")
93 print(f"Duration: {duration_hours} hours")
94 print(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\\n")
95
96 checks = 0
97 total_checks = (duration_hours * 60) // interval_minutes
98
99 while checks < total_checks:
100 checks += 1
101 print(f"📊 Check #{checks}/{total_checks} at {datetime.now().strftime('%H:%M:%S')}")
102
103 changes = self.check_status_changes(usernames)
104
105 if changes:
106 print(f"⚠️ Detected {len(changes)} status change(s)!")
107 for username, change in changes.items():
108 print(f" • @{username}: {change['change_type']}")
109 else:
110 print(" No changes detected")
111
112 if checks < total_checks:
113 print(f"💤 Sleeping for {interval_minutes} minutes...\\n")
114 time.sleep(interval_minutes * 60)
115
116 print(f"\\n✅ Monitoring completed after {checks} checks")
117 self.print_summary()
118
119 def print_summary(self):
120 """Print monitoring summary"""
121 print("\\n📈 Monitoring Summary:")
122 print(f"Users tracked: {len(self.status_history)}")
123
124 open_dms = sum(
125 1 for data in self.status_history.values()
126 if data['status']
127 )
128 closed_dms = len(self.status_history) - open_dms
129
130 print(f"Currently open: {open_dms}")
131 print(f"Currently closed: {closed_dms}")
132
133# Example usage
134def main():
135 API_TOKEN = "YOUR_API_TOKEN_HERE"
136
137 # Initialize monitor
138 monitor = DMStatusMonitor(API_TOKEN)
139
140 # Users to monitor
141 vip_users = [
142 "elonmusk", "billgates", "twitter",
143 "nasa", "openai", "github"
144 ]
145
146 # Option 1: Single check
147 print("=== Single Status Check ===")
148 changes = monitor.check_status_changes(vip_users)
149
150 # Option 2: Continuous monitoring (commented out for example)
151 # monitor.monitor_continuously(
152 # vip_users,
153 # interval_minutes=5,
154 # duration_hours=1
155 # )
156
157if __name__ == "__main__":
158 main()Best Practices
Answer: Best Practices means using TwexAPI Bearer APIs on api.twexapi.io for this user case—typically
14 credits per read ($0.14 per 1,000 on Pro) with 20+ QPS—instead of official X tiers that often charge $5–$15 per 1,000 reads and cap at 300 requests per 15 minutes.
1. Batch Processing for Efficiency
Always check multiple users in a single request to minimize API calls:
1def check_dm_status_in_batches(
2 all_usernames: List[str],
3 api_token: str,
4 batch_size: int = 50
5) -> List[Dict]:
6 """
7 Check DM status in batches for large user lists
8
9 Args:
10 all_usernames: Complete list of usernames
11 api_token: API bearer token
12 batch_size: Number of users per batch
13
14 Returns:
15 Combined results from all batches
16 """
17 all_results = []
18 total_batches = (len(all_usernames) + batch_size - 1) // batch_size
19
20 print(f"📦 Processing {len(all_usernames)} users in {total_batches} batches")
21
22 for i in range(0, len(all_usernames), batch_size):
23 batch = all_usernames[i:i + batch_size]
24 batch_num = (i // batch_size) + 1
25
26 print(f" Batch {batch_num}/{total_batches}: {len(batch)} users", end='')
27
28 result = check_dm_status(batch, api_token)
29
30 if result and 'data' in result:
31 all_results.extend(result['data'])
32 print(" ✅")
33 else:
34 print(" ❌")
35
36 # Rate limiting: wait between batches
37 if i + batch_size < len(all_usernames):
38 time.sleep(1)
39
40 print(f"\\n✅ Processed {len(all_results)} users total")
41 return all_results
42
43# Example: Check 150 users
44large_user_list = [f"user{i}" for i in range(1, 151)]
45results = check_dm_status_in_batches(large_user_list, API_TOKEN, batch_size=50)2. Caching to Reduce API Calls
Implement caching to avoid redundant checks:
1from datetime import datetime, timedelta
2from typing import Optional
3
4class DMStatusCache:
5 def __init__(self, cache_duration_hours: int = 24):
6 """
7 Initialize cache with configurable duration
8
9 Args:
10 cache_duration_hours: How long to cache results
11 """
12 self.cache = {}
13 self.cache_duration = timedelta(hours=cache_duration_hours)
14
15 def get(self, username: str) -> Optional[bool]:
16 """Get cached DM status if still valid"""
17 if username in self.cache:
18 status, timestamp = self.cache[username]
19 if datetime.now() - timestamp < self.cache_duration:
20 return status
21 return None
22
23 def set(self, username: str, dm_status: bool):
24 """Cache DM status with timestamp"""
25 self.cache[username] = (dm_status, datetime.now())
26
27 def invalidate(self, username: str):
28 """Invalidate cache for specific user"""
29 if username in self.cache:
30 del self.cache[username]
31
32 def clear(self):
33 """Clear entire cache"""
34 self.cache.clear()
35
36 def get_stats(self) -> Dict:
37 """Get cache statistics"""
38 now = datetime.now()
39 valid_entries = sum(
40 1 for _, timestamp in self.cache.values()
41 if now - timestamp < self.cache_duration
42 )
43
44 return {
45 'total_entries': len(self.cache),
46 'valid_entries': valid_entries,
47 'expired_entries': len(self.cache) - valid_entries
48 }
49
50# Usage with cache
51cache = DMStatusCache(cache_duration_hours=24)
52
53def check_dm_status_with_cache(
54 usernames: List[str],
55 api_token: str,
56 use_cache: bool = True
57) -> List[Dict]:
58 """Check DM status with caching support"""
59
60 uncached_users = []
61 results = []
62 cache_hits = 0
63
64 # Check cache first
65 if use_cache:
66 for username in usernames:
67 cached_status = cache.get(username)
68 if cached_status is not None:
69 results.append({
70 'username': username,
71 'dm_status': cached_status,
72 'cached': True
73 })
74 cache_hits += 1
75 else:
76 uncached_users.append(username)
77 else:
78 uncached_users = usernames
79
80 # Fetch uncached users from API
81 if uncached_users:
82 print(f"🔍 Checking {len(uncached_users)} users via API...")
83 api_result = check_dm_status(uncached_users, api_token)
84
85 if api_result and 'data' in api_result:
86 for user_data in api_result['data']:
87 # Cache the result
88 cache.set(user_data['username'], user_data['dm_status'])
89 user_data['cached'] = False
90 results.append(user_data)
91
92 if use_cache and cache_hits > 0:
93 print(f"⚡ Cache hits: {cache_hits}/{len(usernames)} ({cache_hits/len(usernames)*100:.1f}%)")
94 print(f"💰 Saved {cache_hits} API calls")
95
96 return results
97
98# Example usage
99usernames = ["elonmusk", "billgates", "twitter"]
100
101# First call - fetches from API
102print("=== First Check ===")
103results1 = check_dm_status_with_cache(usernames, API_TOKEN)
104
105# Second call - uses cache
106print("\\n=== Second Check (cached) ===")
107results2 = check_dm_status_with_cache(usernames, API_TOKEN)
108
109# Check cache stats
110stats = cache.get_stats()
111print(f"\\n📊 Cache Stats: {stats}")3. Error Handling and Retry Logic
Implement robust error handling:
1import time
2from typing import Optional, Dict, List
3
4def check_dm_status_with_retry(
5 usernames: List[str],
6 api_token: str,
7 max_retries: int = 3,
8 retry_delay: int = 2
9) -> Optional[Dict]:
10 """
11 Check DM status with automatic retry on failure
12
13 Args:
14 usernames: List of usernames to check
15 api_token: API bearer token
16 max_retries: Maximum number of retry attempts
17 retry_delay: Delay between retries in seconds
18
19 Returns:
20 API response or None if all retries failed
21 """
22 url = "https://api.twexapi.io/dm/status"
23 headers = {
24 "Authorization": f"Bearer {api_token}",
25 "Content-Type": "application/json"
26 }
27
28 for attempt in range(max_retries):
29 try:
30 print(f"🔄 Attempt {attempt + 1}/{max_retries}...")
31
32 response = requests.post(
33 url,
34 json=usernames,
35 headers=headers,
36 timeout=15
37 )
38
39 if response.status_code == 200:
40 print(f"✅ Success on attempt {attempt + 1}")
41 return response.json()
42
43 elif response.status_code == 429:
44 # Rate limit - wait longer
45 wait_time = retry_delay * (attempt + 1) * 2
46 print(f"⏱️ Rate limit hit, waiting {wait_time}s...")
47 time.sleep(wait_time)
48 continue
49
50 elif response.status_code == 401:
51 # Auth error - don't retry
52 print("❌ Authentication error: Check your API token")
53 return None
54
55 elif response.status_code == 422:
56 # Validation error - don't retry
57 print("❌ Validation error: Check username format")
58 print(response.text)
59 return None
60
61 else:
62 print(f"⚠️ HTTP {response.status_code}: {response.text}")
63
64 except requests.exceptions.Timeout:
65 print(f"⏱️ Request timeout on attempt {attempt + 1}")
66
67 except requests.exceptions.ConnectionError:
68 print(f"🔌 Connection error on attempt {attempt + 1}")
69
70 except requests.exceptions.RequestException as e:
71 print(f"❌ Request error: {e}")
72
73 except Exception as e:
74 print(f"❌ Unexpected error: {e}")
75 return None
76
77 # Wait before retry (except on last attempt)
78 if attempt < max_retries - 1:
79 wait = retry_delay * (2 ** attempt) # Exponential backoff
80 print(f"💤 Waiting {wait}s before retry...")
81 time.sleep(wait)
82
83 print(f"❌ All {max_retries} attempts failed")
84 return None
85
86# Example usage with retry
87usernames = ["elonmusk", "billgates", "twitter"]
88result = check_dm_status_with_retry(
89 usernames,
90 API_TOKEN,
91 max_retries=3,
92 retry_delay=2
93)Security Best Practices
Answer: Security Best Practices means using TwexAPI Bearer APIs on api.twexapi.io for this user case—typically
14 credits per read ($0.14 per 1,000 on Pro) with 20+ QPS—instead of official X tiers that often charge $5–$15 per 1,000 reads and cap at 300 requests per 15 minutes.
Protecting Your API Token
1import os
2from dotenv import load_dotenv
3
4# Load environment variables from .env file
5load_dotenv()
6
7# Get API token from environment variable
8API_TOKEN = os.getenv('TWITTERXAPI_TOKEN')
9
10if not API_TOKEN:
11 raise ValueError("TWITTERXAPI_TOKEN not found in environment variables")
12
13# Use the token
14result = check_dm_status(["elonmusk"], API_TOKEN)Create a .env file:
# TwitterXApi Configuration
TWITTERXAPI_TOKEN=your_actual_token_hereAdd to .gitignore:
.env
*.env
.env.local
.env.productionGetting Your API Token
Answer: Getting Your API Token is implemented by calling the TwexAPI endpoint documented in this guide with a Bearer Token; batch or paginated requests reduce overhead to ~14 credits per call at 20+ QPS.
To use TwitterXApi, you need a Bearer token:
- Sign up at TwitterXApi.com
- Navigate to your dashboard
- Create a new API key
- Copy your Bearer token
- Store it securely in environment variables
Real-World Implementation Example
Answer: Real-World Implementation Example is implemented by calling the TwexAPI endpoint documented in this guide with a Bearer Token; batch or paginated requests reduce overhead to ~14 credits per call at 20+ QPS.
Here's a complete, production-ready implementation:
1"""
2TwitterXApi DM Status Checker
3A production-ready implementation with all best practices
4"""
5
6import os
7import requests
8import time
9import logging
10from datetime import datetime, timedelta
11from typing import List, Dict, Optional
12from dataclasses import dataclass
13from dotenv import load_dotenv
14
15# Configure logging
16logging.basicConfig(
17 level=logging.INFO,
18 format='%(asctime)s - %(levelname)s - %(message)s'
19)
20logger = logging.getLogger(__name__)
21
22@dataclass
23class DMStatusResult:
24 """Data class for DM status results"""
25 username: str
26 dm_status: bool
27 checked_at: datetime
28 cached: bool = False
29
30class DMStatusChecker:
31 """
32 Production-ready DM status checker with caching,
33 error handling, and retry logic
34 """
35
36 def __init__(
37 self,
38 api_token: Optional[str] = None,
39 cache_duration_hours: int = 24,
40 max_retries: int = 3
41 ):
42 """
43 Initialize DM status checker
44
45 Args:
46 api_token: TwitterXApi bearer token (or from env)
47 cache_duration_hours: Cache validity duration
48 max_retries: Maximum retry attempts
49 """
50 # Load API token
51 if api_token is None:
52 load_dotenv()
53 api_token = os.getenv('TWITTERXAPI_TOKEN')
54
55 if not api_token:
56 raise ValueError("API token required")
57
58 self.api_token = api_token
59 self.url = "https://api.twexapi.io/dm/status"
60 self.cache = {}
61 self.cache_duration = timedelta(hours=cache_duration_hours)
62 self.max_retries = max_retries
63
64 logger.info("DMStatusChecker initialized")
65
66 def check(
67 self,
68 usernames: List[str],
69 use_cache: bool = True
70 ) -> List[DMStatusResult]:
71 """
72 Check DM status for users
73
74 Args:
75 usernames: List of Twitter usernames
76 use_cache: Whether to use cached results
77
78 Returns:
79 List of DMStatusResult objects
80 """
81 if not usernames:
82 logger.warning("Empty username list provided")
83 return []
84
85 logger.info(f"Checking DM status for {len(usernames)} users")
86
87 # Separate cached and uncached users
88 results = []
89 uncached = []
90
91 if use_cache:
92 for username in usernames:
93 cached_result = self._get_from_cache(username)
94 if cached_result:
95 results.append(cached_result)
96 else:
97 uncached.append(username)
98
99 if results:
100 logger.info(f"Cache hits: {len(results)}/{len(usernames)}")
101 else:
102 uncached = usernames
103
104 # Fetch uncached users from API
105 if uncached:
106 api_results = self._fetch_from_api(uncached)
107 if api_results:
108 results.extend(api_results)
109
110 logger.info(f"Total results: {len(results)}")
111 return results
112
113 def _get_from_cache(self, username: str) -> Optional[DMStatusResult]:
114 """Get result from cache if valid"""
115 if username in self.cache:
116 result, timestamp = self.cache[username]
117 if datetime.now() - timestamp < self.cache_duration:
118 return DMStatusResult(
119 username=username,
120 dm_status=result,
121 checked_at=timestamp,
122 cached=True
123 )
124 return None
125
126 def _set_cache(self, username: str, dm_status: bool):
127 """Store result in cache"""
128 self.cache[username] = (dm_status, datetime.now())
129
130 def _fetch_from_api(
131 self,
132 usernames: List[str]
133 ) -> List[DMStatusResult]:
134 """Fetch DM status from API with retry logic"""
135 headers = {
136 "Authorization": f"Bearer {self.api_token}",
137 "Content-Type": "application/json"
138 }
139
140 for attempt in range(self.max_retries):
141 try:
142 logger.info(f"API request attempt {attempt + 1}/{self.max_retries}")
143
144 response = requests.post(
145 self.url,
146 json=usernames,
147 headers=headers,
148 timeout=15
149 )
150
151 if response.status_code == 200:
152 data = response.json()
153 results = []
154
155 for user_data in data.get('data', []):
156 username = user_data['username']
157 dm_status = user_data['dm_status']
158
159 # Cache the result
160 self._set_cache(username, dm_status)
161
162 # Create result object
163 results.append(DMStatusResult(
164 username=username,
165 dm_status=dm_status,
166 checked_at=datetime.now(),
167 cached=False
168 ))
169
170 logger.info(f"Successfully fetched {len(results)} results")
171 return results
172
173 elif response.status_code == 429:
174 wait_time = 60 * (attempt + 1)
175 logger.warning(f"Rate limit hit, waiting {wait_time}s")
176 time.sleep(wait_time)
177
178 elif response.status_code in [401, 422]:
179 logger.error(f"API error {response.status_code}: {response.text}")
180 return []
181
182 else:
183 logger.warning(f"HTTP {response.status_code}: {response.text}")
184
185 except requests.exceptions.RequestException as e:
186 logger.error(f"Request failed: {e}")
187
188 # Exponential backoff
189 if attempt < self.max_retries - 1:
190 wait = 2 ** attempt
191 logger.info(f"Waiting {wait}s before retry")
192 time.sleep(wait)
193
194 logger.error("All retry attempts failed")
195 return []
196
197 def get_cache_stats(self) -> Dict:
198 """Get cache statistics"""
199 now = datetime.now()
200 valid = sum(
201 1 for _, timestamp in self.cache.values()
202 if now - timestamp < self.cache_duration
203 )
204
205 return {
206 'total_entries': len(self.cache),
207 'valid_entries': valid,
208 'expired_entries': len(self.cache) - valid
209 }
210
211 def clear_cache(self):
212 """Clear the cache"""
213 self.cache.clear()
214 logger.info("Cache cleared")
215
216# Example usage
217def main():
218 """Example usage of DMStatusChecker"""
219
220 # Initialize checker
221 checker = DMStatusChecker(
222 cache_duration_hours=24,
223 max_retries=3
224 )
225
226 # Test users
227 usernames = [
228 "elonmusk", "billgates", "twitter",
229 "nasa", "openai", "github"
230 ]
231
232 # Check DM status
233 results = checker.check(usernames, use_cache=True)
234
235 # Display results
236 print("\\n" + "="*60)
237 print("DM STATUS REPORT".center(60))
238 print("="*60 + "\\n")
239
240 open_count = 0
241 closed_count = 0
242
243 for result in results:
244 status_emoji = "✅" if result.dm_status else "❌"
245 status_text = "Open" if result.dm_status else "Closed"
246 cache_indicator = "📦" if result.cached else "🌐"
247
248 print(f"{status_emoji} @{result.username:15} DMs: {status_text:8} {cache_indicator}")
249
250 if result.dm_status:
251 open_count += 1
252 else:
253 closed_count += 1
254
255 # Summary
256 print("\\n" + "-"*60)
257 print(f"Total Users: {len(results)}")
258 print(f"DMs Open: {open_count} ({open_count/len(results)*100:.1f}%)")
259 print(f"DMs Closed: {closed_count} ({closed_count/len(results)*100:.1f}%)")
260 print("-"*60)
261
262 # Cache stats
263 cache_stats = checker.get_cache_stats()
264 print(f"\\nCache: {cache_stats['valid_entries']} valid entries")
265 print("="*60)
266
267if __name__ == "__main__":
268 main()Conclusion
TwitterXApi's DM Status endpoint is an essential tool for anyone building Twitter automation, social media management systems, or customer engagement applications. By checking DM status before attempting to send messages, you can:
- ✅ Improve efficiency by avoiding failed message attempts
- 💰 Reduce costs by minimizing unnecessary API calls
- 🎯 Better targeting with accurate contact information
- 🚀 Enhance automation with smarter bot logic
- 📊 Better analytics with DM availability tracking
Key Takeaways
- Batch Processing: Always check multiple users at once
- Caching: Implement caching to reduce API calls
- Error Handling: Use retry logic and proper error handling
- Security: Store API tokens in environment variables
- Monitoring: Track DM status changes over time
- Integration: Combine with CRM and marketing tools
Getting Started
- Sign up at TwitterXApi.com
- Get your Bearer token from the dashboard
- Install required packages (
requests,python-dotenv) - Implement basic checking functionality
- Add caching and error handling
- Scale up to production use
Next Steps
- Explore other TwitterXApi endpoints for comprehensive automation
- Build integrated social media management dashboards
- Implement real-time monitoring systems
- Create intelligent chatbots and customer service tools
For more information, check out:
Start building smarter Twitter automation today with TwitterXApi's DM Status checking!
This guide is based on TwitterXApi's documentation as of January 2025. Always refer to the official documentation for the most up-to-date information.