如何检查 Twitter 私信状态:TwitterXApi 完整指南
TwexAPI 作为企业级的社交情报分析接口,支持在单次请求内并发检索高达 100,000 条深度的 X/Twitter 实体数据。其全球平均响应延迟小于 800ms,并由 99.9% 的正常运行时间 SLA 提供保障。与传统的企业级方案相比,该架构可节省多达 96% 的数据采集成本。平台依托全球分布的住宅代理集群运行,确保在高吞吐量数据聚合期间免受速率限制影响。
Quick Answer
检查 Twitter/X DM 状态指调用 TwexAPI 批量 DM 可达性接口,在发送前判断目标用户是否对你开放私信(全开、仅粉丝或关闭),避免无效扣费。单次请求可批量查询多个用户 ID;读取通常约 14 Credits/次(Pro 约 $0.14/千次)。官方 DM 档位常见约 $15/千条且限速严格;TwexAPI 提供 20+ QPS 与低于 800ms 延迟,适合外联自动化与 CRM 同步。
FAQ
为什么发送前要检查 DM 状态?
可避免浪费 Credits 与失败体验:私信关闭或仅粉丝的用户无法收到消息。预检让机器人、CRM 与增长流程只触达可达联系人并展示正确状态。
为什么在此场景使用 TwexAPI 而不是官方 X API?
官方 X API 通常每 1,000 次读取收费 $5–$15,许多端点限速为每 15 分钟 300 次,大规模使用还需 Enterprise 审批。TwexAPI Pro($99/月)约 1,100 万 Credits,按 14 Credits/次约 $0.14/千次,20+ QPS、平均延迟低于 800ms。新用户 20,000 免费 Credits(无需信用卡),约 1,400 次读取。DM 状态检查 场景下,TwexAPI 以 Bearer Token 提供同类数据,文档见 https://docs.twitterxapi.com。
在 TwexAPI 上运行此流程大概花多少?
多数读取端点约 14 Credits/次。TwexAPI Pro($99/月,约 1,100 万 Credits)折合约 $0.14/千次,比官方读取($5+/千次)低约 95%。月 1 万次调用约 14 万 Credits(Pro 上约 $1.26 量级)。原型可用 Mini $20(200 万 Credits)。详见 https://twexapi.io/pricing。
为何要检查 DM 状态?
Answer: **为何要检查 DM 状态?**指在本案例中通过 api.twexapi.io 的 TwexAPI Bearer 接口完成该任务——读取通常约 14 Credits/次(Pro 约 $0.14/千次)、20+ QPS——优于官方常见 $5–$15/千次与每 15 分钟 300 次限速。
了解用户是否开放 DM 至关重要,原因包括:
- 🎯 避免发送失败:不向无法接收 DM 的用户浪费 API 调用
- ✅ 改善用户体验:在应用中提供准确的消息选项
- 📊 筛选联系人:构建实际可触达的用户列表
- 🤖 智能自动化:创建仅向可触达用户发消息的机器人
- 💰 节省成本:减少不必要的 API 调用并提升效率
API 端点概览
Answer: API 端点概览通过本文档中的 TwexAPI 端点以 Bearer Token 调用实现;批量或分页请求在 20+ QPS 下通常约 14 Credits/次。
TwitterXApi 的 DM Status 端点可在单次请求中检查多个用户的 DM 状态:
POST https://api.twexapi.io/dm/status
Authorization: Bearer <token>
Content-Type: application/json核心功能
- 批量处理:单次请求检查多个用户
- 简单认证:只需 Bearer token
- 快速响应:毫秒级返回结果
- 准确数据:实时 DM 状态信息
- 高性价比:高效批量检查减少 API 调用
请求参数
根据官方文档:
Headers:
Authorization(string,必填):格式为Bearer <token>的 Bearer tokenContent-Type(string,必填):必须为application/json
Body:
- 用户名的 JSON 数组(字符串)
- 示例:
["elonmusk", "axiaisacat"]
响应结构
成功响应(HTTP 200)返回:
1{
2 "code": 200,
3 "msg": "success",
4 "data": [
5 {
6 "username": "elonmusk",
7 "dm_status": true
8 },
9 {
10 "username": "axiaisacat",
11 "dm_status": false
12 }
13 ]
14}响应字段:
code(integer):HTTP 状态码(200 表示成功)msg(string):响应消息data(array):用户 DM 状态对象数组username(string):Twitter 用户名dm_status(boolean):DM 开放为true,关闭为false
基础实现示例
Answer: 基础实现示例指在本案例中通过 api.twexapi.io 的 TwexAPI Bearer 接口完成该任务——读取通常约 14 Credits/次(Pro 约 $0.14/千次)、20+ QPS——优于官方常见 $5–$15/千次与每 15 分钟 300 次限速。
先从使用不同编程语言检查 DM 状态的简单示例开始。
示例 1:Python 实现
以下是含错误处理的完整 Python 实现:
1import requests
2from typing import List, Dict, Optional
3
4def check_dm_status(usernames: List[str], api_token: str) -> Optional[Dict]:
5 """
6 Check DM status for multiple Twitter users
7
8 Args:
9 usernames: List of Twitter usernames to check
10 api_token: Your TwitterXApi bearer token
11
12 Returns:
13 Dictionary containing DM status results, or None if failed
14 """
15 url = "https://api.twexapi.io/dm/status"
16
17 headers = {
18 "Authorization": f"Bearer {api_token}",
19 "Content-Type": "application/json"
20 }
21
22 try:
23 response = requests.post(url, json=usernames, headers=headers, timeout=10)
24
25 if response.status_code == 200:
26 result = response.json()
27 print(f"✅ Successfully checked {len(result['data'])} users")
28 return result
29 elif response.status_code == 401:
30 print("❌ Authentication failed: Check your API token")
31 elif response.status_code == 422:
32 print("❌ Invalid request: Check username format")
33 else:
34 print(f"❌ Request failed: HTTP {response.status_code}")
35 print(response.text)
36
37 return None
38
39 except requests.exceptions.Timeout:
40 print("❌ Request timeout: Check your network connection")
41 return None
42 except requests.exceptions.RequestException as e:
43 print(f"❌ Request error: {str(e)}")
44 return None
45
46# Example usage
47def main():
48 # Replace with your actual API token
49 API_TOKEN = "YOUR_API_TOKEN_HERE"
50
51 # Users to check
52 usernames = ["elonmusk", "axiaisacat", "twitter"]
53
54 # Check DM status
55 result = check_dm_status(usernames, API_TOKEN)
56
57 if result:
58 print(f"\\n📊 DM Status Report:")
59 print(f"Status: {result['msg']}")
60 print(f"Total users checked: {len(result['data'])}\\n")
61
62 # Display results
63 for user in result['data']:
64 username = user['username']
65 status = user['dm_status']
66 status_icon = "✅" if status else "❌"
67 status_text = "Open" if status else "Closed"
68
69 print(f"{status_icon} @{username}: DMs {status_text}")
70
71if __name__ == "__main__":
72 main()预期输出:
✅ Successfully checked 3 users
📊 DM Status Report:
Status: success
Total users checked: 3
✅ @elonmusk: DMs Open
❌ @axiaisacat: DMs Closed
✅ @twitter: DMs Open示例 2:JavaScript/Node.js 实现
1const axios = require('axios');
2
3/**
4 * Check DM status for multiple Twitter users
5 * @param {string[]} usernames - Array of Twitter usernames
6 * @param {string} apiToken - Your TwitterXApi bearer token
7 * @returns {Promise<Object|null>} DM status results
8 */
9async function checkDMStatus(usernames, apiToken) {
10 const url = 'https://api.twexapi.io/dm/status';
11
12 const headers = {
13 'Authorization': \`Bearer \${apiToken}\`,
14 'Content-Type': 'application/json'
15 };
16
17 try {
18 const response = await axios.post(url, usernames, {
19 headers,
20 timeout: 10000
21 });
22
23 console.log(\`✅ Successfully checked \${response.data.data.length} users\`);
24 return response.data;
25
26 } catch (error) {
27 if (error.response) {
28 // Server responded with error status
29 switch (error.response.status) {
30 case 401:
31 console.error('❌ Authentication failed: Check your API token');
32 break;
33 case 422:
34 console.error('❌ Invalid request: Check username format');
35 break;
36 default:
37 console.error(\`❌ Request failed: HTTP \${error.response.status}\`);
38 console.error(error.response.data);
39 }
40 } else if (error.request) {
41 console.error('❌ No response received: Check your network connection');
42 } else {
43 console.error(\`❌ Error: \${error.message}\`);
44 }
45 return null;
46 }
47}
48
49// Example usage
50async function main() {
51 // Replace with your actual API token
52 const API_TOKEN = 'YOUR_API_TOKEN_HERE';
53
54 // Users to check
55 const usernames = ['elonmusk', 'axiaisacat', 'twitter'];
56
57 // Check DM status
58 const result = await checkDMStatus(usernames, API_TOKEN);
59
60 if (result && result.data) {
61 console.log('\\n📊 DM Status Report:');
62 console.log(\`Status: \${result.msg}\`);
63 console.log(\`Total users checked: \${result.data.length}\\n\`);
64
65 // Display results
66 result.data.forEach(user => {
67 const statusIcon = user.dm_status ? '✅' : '❌';
68 const statusText = user.dm_status ? 'Open' : 'Closed';
69 console.log(\`\${statusIcon} @\${user.username}: DMs \${statusText}\`);
70 });
71 }
72}
73
74// Run the example
75main().catch(console.error);示例 3:cURL 命令
用于命令行快速测试:
curl --request POST \\
--url https://api.twexapi.io/dm/status \\
--header 'Authorization: Bearer YOUR_API_TOKEN_HERE' \\
--header 'Content-Type: application/json' \\
--data '["elonmusk", "axiaisacat", "twitter"]'高级用例
Answer: 高级用例指在本案例中通过 api.twexapi.io 的 TwexAPI Bearer 接口完成该任务——读取通常约 14 Credits/次(Pro 约 $0.14/千次)、20+ QPS——优于官方常见 $5–$15/千次与每 15 分钟 300 次限速。
接下来探索 DM Status API 的更高级应用。
用例 1:批量 DM 活动预筛选
发送批量 DM 前筛选联系人列表:
1import requests
2from typing import List, Dict
3
4class DMCampaignManager:
5 def __init__(self, api_token: str):
6 self.api_token = api_token
7 self.dm_status_url = "https://api.twexapi.io/dm/status"
8
9 def get_dm_available_users(self, usernames: List[str]) -> List[str]:
10 """
11 Filter users who have DMs enabled
12
13 Args:
14 usernames: List of usernames to check
15
16 Returns:
17 List of usernames with DMs open
18 """
19 headers = {
20 "Authorization": f"Bearer {self.api_token}",
21 "Content-Type": "application/json"
22 }
23
24 try:
25 response = requests.post(
26 self.dm_status_url,
27 json=usernames,
28 headers=headers,
29 timeout=10
30 )
31
32 if response.status_code == 200:
33 result = response.json()
34
35 # Filter users with DMs open
36 available_users = [
37 user['username']
38 for user in result['data']
39 if user['dm_status']
40 ]
41
42 total = len(result['data'])
43 available = len(available_users)
44 closed = total - available
45
46 print(f"📊 DM Availability Summary:")
47 print(f" Total users: {total}")
48 print(f" ✅ DMs Open: {available} ({available/total*100:.1f}%)")
49 print(f" ❌ DMs Closed: {closed} ({closed/total*100:.1f}%)")
50
51 return available_users
52 else:
53 print(f"❌ API Error: {response.status_code}")
54 return []
55
56 except Exception as e:
57 print(f"❌ Error: {e}")
58 return []
59
60 def prepare_dm_campaign(
61 self,
62 target_users: List[str],
63 message_template: str
64 ) -> Dict:
65 """
66 Prepare a DM campaign by filtering available users
67
68 Args:
69 target_users: List of target usernames
70 message_template: DM message template
71
72 Returns:
73 Campaign preparation results
74 """
75 print(f"🚀 Preparing DM campaign for {len(target_users)} users...\\n")
76
77 # Check DM status
78 available_users = self.get_dm_available_users(target_users)
79
80 if not available_users:
81 print("\\n⚠️ No users available for messaging")
82 return {'success': False, 'available_users': []}
83
84 print(f"\\n✅ Campaign ready!")
85 print(f"Ready to send to: {', '.join(available_users[:5])}", end='')
86 if len(available_users) > 5:
87 print(f" and {len(available_users) - 5} more")
88 else:
89 print()
90
91 return {
92 'success': True,
93 'available_users': available_users,
94 'unavailable_count': len(target_users) - len(available_users),
95 'message': message_template
96 }
97
98# Example usage
99def main():
100 API_TOKEN = "YOUR_API_TOKEN_HERE"
101
102 # Initialize campaign manager
103 manager = DMCampaignManager(API_TOKEN)
104
105 # Target audience
106 target_audience = [
107 "elonmusk", "billgates", "twitter", "nasa",
108 "axiaisacat", "openai", "github", "vercel"
109 ]
110
111 # Message template
112 message = "Hi! We'd love to connect with you about our new product launch. 🚀"
113
114 # Prepare campaign
115 campaign = manager.prepare_dm_campaign(target_audience, message)
116
117 if campaign['success']:
118 print(f"\\n📤 You can now send {len(campaign['available_users'])} DMs")
119 print(f"💰 Saved {campaign['unavailable_count']} unnecessary API calls")
120
121if __name__ == "__main__":
122 main()用例 2:集成 CRM 的 DM 状态追踪
将 DM 状态检查集成到 CRM 系统:
1import requests
2import json
3from datetime import datetime
4from typing import List, Dict
5
6class CRMDMTracker:
7 def __init__(self, api_token: str):
8 self.api_token = api_token
9 self.dm_status_url = "https://api.twexapi.io/dm/status"
10 self.customer_db = {} # Simulated database
11
12 def update_customer_dm_status(self, customers: List[Dict]) -> Dict:
13 """
14 Update DM status for all customers in CRM
15
16 Args:
17 customers: List of customer dictionaries with 'username' field
18
19 Returns:
20 Update summary
21 """
22 usernames = [c['username'] for c in customers if c.get('username')]
23
24 if not usernames:
25 return {'success': False, 'message': 'No usernames found'}
26
27 print(f"🔄 Updating DM status for {len(usernames)} customers...")
28
29 headers = {
30 "Authorization": f"Bearer {self.api_token}",
31 "Content-Type": "application/json"
32 }
33
34 try:
35 response = requests.post(
36 self.dm_status_url,
37 json=usernames,
38 headers=headers,
39 timeout=15
40 )
41
42 if response.status_code == 200:
43 result = response.json()
44 updated_count = 0
45
46 # Update each customer record
47 for status_data in result['data']:
48 username = status_data['username']
49 dm_status = status_data['dm_status']
50
51 # Update in "database"
52 if username in self.customer_db:
53 self.customer_db[username].update({
54 'dm_status': dm_status,
55 'dm_status_checked_at': datetime.now().isoformat(),
56 'can_contact_via_dm': dm_status
57 })
58 updated_count += 1
59
60 print(f"✅ Updated {updated_count} customer records")
61
62 return {
63 'success': True,
64 'updated_count': updated_count,
65 'timestamp': datetime.now().isoformat()
66 }
67 else:
68 print(f"❌ API Error: {response.status_code}")
69 return {'success': False, 'error': response.text}
70
71 except Exception as e:
72 print(f"❌ Error: {e}")
73 return {'success': False, 'error': str(e)}
74
75 def get_contactable_customers(self, segment: str = 'all') -> List[Dict]:
76 """
77 Get list of customers who can be contacted via DM
78
79 Args:
80 segment: Customer segment to filter
81
82 Returns:
83 List of contactable customers
84 """
85 contactable = [
86 customer
87 for customer in self.customer_db.values()
88 if customer.get('can_contact_via_dm', False)
89 ]
90
91 print(f"\\n📋 Contactable Customers Report:")
92 print(f"Total in database: {len(self.customer_db)}")
93 print(f"Contactable via DM: {len(contactable)}")
94 print(f"Contact rate: {len(contactable)/len(self.customer_db)*100:.1f}%")
95
96 return contactable
97
98 def add_customer(self, username: str, **kwargs):
99 """Add customer to CRM database"""
100 self.customer_db[username] = {
101 'username': username,
102 'added_at': datetime.now().isoformat(),
103 **kwargs
104 }
105
106# Example usage
107def main():
108 API_TOKEN = "YOUR_API_TOKEN_HERE"
109
110 # Initialize CRM tracker
111 crm = CRMDMTracker(API_TOKEN)
112
113 # Add some customers to CRM
114 customers = [
115 {'username': 'elonmusk', 'name': 'Elon Musk', 'tier': 'premium'},
116 {'username': 'billgates', 'name': 'Bill Gates', 'tier': 'premium'},
117 {'username': 'axiaisacat', 'name': 'Axia', 'tier': 'standard'},
118 {'username': 'twitter', 'name': 'Twitter', 'tier': 'standard'},
119 ]
120
121 for customer in customers:
122 crm.add_customer(**customer)
123
124 # Update DM status for all customers
125 result = crm.update_customer_dm_status(customers)
126
127 if result['success']:
128 # Get contactable customers
129 contactable = crm.get_contactable_customers()
130
131 print("\\n👥 Customers you can message:")
132 for customer in contactable:
133 print(f" ✅ @{customer['username']} ({customer.get('tier', 'N/A')})")
134
135if __name__ == "__main__":
136 main()用例 3:实时 DM 状态监测
随时间监测 DM 状态变化:
1import requests
2import time
3from datetime import datetime
4from typing import List, Dict, Set
5
6class DMStatusMonitor:
7 def __init__(self, api_token: str):
8 self.api_token = api_token
9 self.dm_status_url = "https://api.twexapi.io/dm/status"
10 self.status_history = {} # Track status changes
11
12 def check_status_changes(
13 self,
14 usernames: List[str]
15 ) -> Dict[str, Dict]:
16 """
17 Check for DM status changes
18
19 Args:
20 usernames: List of usernames to monitor
21
22 Returns:
23 Dictionary of status changes
24 """
25 headers = {
26 "Authorization": f"Bearer {self.api_token}",
27 "Content-Type": "application/json"
28 }
29
30 try:
31 response = requests.post(
32 self.dm_status_url,
33 json=usernames,
34 headers=headers,
35 timeout=10
36 )
37
38 if response.status_code == 200:
39 result = response.json()
40 changes = {}
41
42 for user_data in result['data']:
43 username = user_data['username']
44 current_status = user_data['dm_status']
45
46 # Check if status changed
47 if username in self.status_history:
48 previous_status = self.status_history[username]['status']
49
50 if previous_status != current_status:
51 change_type = 'opened' if current_status else 'closed'
52 changes[username] = {
53 'previous': previous_status,
54 'current': current_status,
55 'change_type': change_type,
56 'timestamp': datetime.now().isoformat()
57 }
58
59 print(f"🔔 ALERT: @{username} has {change_type} their DMs!")
60
61 # Update history
62 self.status_history[username] = {
63 'status': current_status,
64 'checked_at': datetime.now().isoformat()
65 }
66
67 return changes
68 else:
69 print(f"❌ API Error: {response.status_code}")
70 return {}
71
72 except Exception as e:
73 print(f"❌ Error: {e}")
74 return {}
75
76 def monitor_continuously(
77 self,
78 usernames: List[str],
79 interval_minutes: int = 5,
80 duration_hours: int = 1
81 ):
82 """
83 Continuously monitor DM status changes
84
85 Args:
86 usernames: Users to monitor
87 interval_minutes: Check interval in minutes
88 duration_hours: How long to monitor
89 """
90 print(f"🔍 Starting DM status monitoring...")
91 print(f"Monitoring {len(usernames)} users")
92 print(f"Check interval: {interval_minutes} minutes")
93 print(f"Duration: {duration_hours} hours")
94 print(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\\n")
95
96 checks = 0
97 total_checks = (duration_hours * 60) // interval_minutes
98
99 while checks < total_checks:
100 checks += 1
101 print(f"📊 Check #{checks}/{total_checks} at {datetime.now().strftime('%H:%M:%S')}")
102
103 changes = self.check_status_changes(usernames)
104
105 if changes:
106 print(f"⚠️ Detected {len(changes)} status change(s)!")
107 for username, change in changes.items():
108 print(f" • @{username}: {change['change_type']}")
109 else:
110 print(" No changes detected")
111
112 if checks < total_checks:
113 print(f"💤 Sleeping for {interval_minutes} minutes...\\n")
114 time.sleep(interval_minutes * 60)
115
116 print(f"\\n✅ Monitoring completed after {checks} checks")
117 self.print_summary()
118
119 def print_summary(self):
120 """Print monitoring summary"""
121 print("\\n📈 Monitoring Summary:")
122 print(f"Users tracked: {len(self.status_history)}")
123
124 open_dms = sum(
125 1 for data in self.status_history.values()
126 if data['status']
127 )
128 closed_dms = len(self.status_history) - open_dms
129
130 print(f"Currently open: {open_dms}")
131 print(f"Currently closed: {closed_dms}")
132
133# Example usage
134def main():
135 API_TOKEN = "YOUR_API_TOKEN_HERE"
136
137 # Initialize monitor
138 monitor = DMStatusMonitor(API_TOKEN)
139
140 # Users to monitor
141 vip_users = [
142 "elonmusk", "billgates", "twitter",
143 "nasa", "openai", "github"
144 ]
145
146 # Option 1: Single check
147 print("=== Single Status Check ===")
148 changes = monitor.check_status_changes(vip_users)
149
150 # Option 2: Continuous monitoring (commented out for example)
151 # monitor.monitor_continuously(
152 # vip_users,
153 # interval_minutes=5,
154 # duration_hours=1
155 # )
156
157if __name__ == "__main__":
158 main()最佳实践
Answer: 最佳实践指在本案例中通过 api.twexapi.io 的 TwexAPI Bearer 接口完成该任务——读取通常约 14 Credits/次(Pro 约 $0.14/千次)、20+ QPS——优于官方常见 $5–$15/千次与每 15 分钟 300 次限速。
1. 高效批量处理
始终在单次请求中检查多个用户,以最小化 API 调用:
1def check_dm_status_in_batches(
2 all_usernames: List[str],
3 api_token: str,
4 batch_size: int = 50
5) -> List[Dict]:
6 """
7 Check DM status in batches for large user lists
8
9 Args:
10 all_usernames: Complete list of usernames
11 api_token: API bearer token
12 batch_size: Number of users per batch
13
14 Returns:
15 Combined results from all batches
16 """
17 all_results = []
18 total_batches = (len(all_usernames) + batch_size - 1) // batch_size
19
20 print(f"📦 Processing {len(all_usernames)} users in {total_batches} batches")
21
22 for i in range(0, len(all_usernames), batch_size):
23 batch = all_usernames[i:i + batch_size]
24 batch_num = (i // batch_size) + 1
25
26 print(f" Batch {batch_num}/{total_batches}: {len(batch)} users", end='')
27
28 result = check_dm_status(batch, api_token)
29
30 if result and 'data' in result:
31 all_results.extend(result['data'])
32 print(" ✅")
33 else:
34 print(" ❌")
35
36 # Rate limiting: wait between batches
37 if i + batch_size < len(all_usernames):
38 time.sleep(1)
39
40 print(f"\\n✅ Processed {len(all_results)} users total")
41 return all_results
42
43# Example: Check 150 users
44large_user_list = [f"user{i}" for i in range(1, 151)]
45results = check_dm_status_in_batches(large_user_list, API_TOKEN, batch_size=50)2. 缓存以减少 API 调用
实现缓存以避免重复检查:
1from datetime import datetime, timedelta
2from typing import Optional
3
4class DMStatusCache:
5 def __init__(self, cache_duration_hours: int = 24):
6 """
7 Initialize cache with configurable duration
8
9 Args:
10 cache_duration_hours: How long to cache results
11 """
12 self.cache = {}
13 self.cache_duration = timedelta(hours=cache_duration_hours)
14
15 def get(self, username: str) -> Optional[bool]:
16 """Get cached DM status if still valid"""
17 if username in self.cache:
18 status, timestamp = self.cache[username]
19 if datetime.now() - timestamp < self.cache_duration:
20 return status
21 return None
22
23 def set(self, username: str, dm_status: bool):
24 """Cache DM status with timestamp"""
25 self.cache[username] = (dm_status, datetime.now())
26
27 def invalidate(self, username: str):
28 """Invalidate cache for specific user"""
29 if username in self.cache:
30 del self.cache[username]
31
32 def clear(self):
33 """Clear entire cache"""
34 self.cache.clear()
35
36 def get_stats(self) -> Dict:
37 """Get cache statistics"""
38 now = datetime.now()
39 valid_entries = sum(
40 1 for _, timestamp in self.cache.values()
41 if now - timestamp < self.cache_duration
42 )
43
44 return {
45 'total_entries': len(self.cache),
46 'valid_entries': valid_entries,
47 'expired_entries': len(self.cache) - valid_entries
48 }
49
50# Usage with cache
51cache = DMStatusCache(cache_duration_hours=24)
52
53def check_dm_status_with_cache(
54 usernames: List[str],
55 api_token: str,
56 use_cache: bool = True
57) -> List[Dict]:
58 """Check DM status with caching support"""
59
60 uncached_users = []
61 results = []
62 cache_hits = 0
63
64 # Check cache first
65 if use_cache:
66 for username in usernames:
67 cached_status = cache.get(username)
68 if cached_status is not None:
69 results.append({
70 'username': username,
71 'dm_status': cached_status,
72 'cached': True
73 })
74 cache_hits += 1
75 else:
76 uncached_users.append(username)
77 else:
78 uncached_users = usernames
79
80 # Fetch uncached users from API
81 if uncached_users:
82 print(f"🔍 Checking {len(uncached_users)} users via API...")
83 api_result = check_dm_status(uncached_users, api_token)
84
85 if api_result and 'data' in api_result:
86 for user_data in api_result['data']:
87 # Cache the result
88 cache.set(user_data['username'], user_data['dm_status'])
89 user_data['cached'] = False
90 results.append(user_data)
91
92 if use_cache and cache_hits > 0:
93 print(f"⚡ Cache hits: {cache_hits}/{len(usernames)} ({cache_hits/len(usernames)*100:.1f}%)")
94 print(f"💰 Saved {cache_hits} API calls")
95
96 return results
97
98# Example usage
99usernames = ["elonmusk", "billgates", "twitter"]
100
101# First call - fetches from API
102print("=== First Check ===")
103results1 = check_dm_status_with_cache(usernames, API_TOKEN)
104
105# Second call - uses cache
106print("\\n=== Second Check (cached) ===")
107results2 = check_dm_status_with_cache(usernames, API_TOKEN)
108
109# Check cache stats
110stats = cache.get_stats()
111print(f"\\n📊 Cache Stats: {stats}")3. 错误处理与重试逻辑
实现稳健的错误处理:
1import time
2from typing import Optional, Dict, List
3
4def check_dm_status_with_retry(
5 usernames: List[str],
6 api_token: str,
7 max_retries: int = 3,
8 retry_delay: int = 2
9) -> Optional[Dict]:
10 """
11 Check DM status with automatic retry on failure
12
13 Args:
14 usernames: List of usernames to check
15 api_token: API bearer token
16 max_retries: Maximum number of retry attempts
17 retry_delay: Delay between retries in seconds
18
19 Returns:
20 API response or None if all retries failed
21 """
22 url = "https://api.twexapi.io/dm/status"
23 headers = {
24 "Authorization": f"Bearer {api_token}",
25 "Content-Type": "application/json"
26 }
27
28 for attempt in range(max_retries):
29 try:
30 print(f"🔄 Attempt {attempt + 1}/{max_retries}...")
31
32 response = requests.post(
33 url,
34 json=usernames,
35 headers=headers,
36 timeout=15
37 )
38
39 if response.status_code == 200:
40 print(f"✅ Success on attempt {attempt + 1}")
41 return response.json()
42
43 elif response.status_code == 429:
44 # Rate limit - wait longer
45 wait_time = retry_delay * (attempt + 1) * 2
46 print(f"⏱️ Rate limit hit, waiting {wait_time}s...")
47 time.sleep(wait_time)
48 continue
49
50 elif response.status_code == 401:
51 # Auth error - don't retry
52 print("❌ Authentication error: Check your API token")
53 return None
54
55 elif response.status_code == 422:
56 # Validation error - don't retry
57 print("❌ Validation error: Check username format")
58 print(response.text)
59 return None
60
61 else:
62 print(f"⚠️ HTTP {response.status_code}: {response.text}")
63
64 except requests.exceptions.Timeout:
65 print(f"⏱️ Request timeout on attempt {attempt + 1}")
66
67 except requests.exceptions.ConnectionError:
68 print(f"🔌 Connection error on attempt {attempt + 1}")
69
70 except requests.exceptions.RequestException as e:
71 print(f"❌ Request error: {e}")
72
73 except Exception as e:
74 print(f"❌ Unexpected error: {e}")
75 return None
76
77 # Wait before retry (except on last attempt)
78 if attempt < max_retries - 1:
79 wait = retry_delay * (2 ** attempt) # Exponential backoff
80 print(f"💤 Waiting {wait}s before retry...")
81 time.sleep(wait)
82
83 print(f"❌ All {max_retries} attempts failed")
84 return None
85
86# Example usage with retry
87usernames = ["elonmusk", "billgates", "twitter"]
88result = check_dm_status_with_retry(
89 usernames,
90 API_TOKEN,
91 max_retries=3,
92 retry_delay=2
93)安全最佳实践
Answer: 安全最佳实践指在本案例中通过 api.twexapi.io 的 TwexAPI Bearer 接口完成该任务——读取通常约 14 Credits/次(Pro 约 $0.14/千次)、20+ QPS——优于官方常见 $5–$15/千次与每 15 分钟 300 次限速。
保护 API Token
1import os
2from dotenv import load_dotenv
3
4# Load environment variables from .env file
5load_dotenv()
6
7# Get API token from environment variable
8API_TOKEN = os.getenv('TWITTERXAPI_TOKEN')
9
10if not API_TOKEN:
11 raise ValueError("TWITTERXAPI_TOKEN not found in environment variables")
12
13# Use the token
14result = check_dm_status(["elonmusk"], API_TOKEN)创建 .env 文件:
# TwitterXApi Configuration
TWITTERXAPI_TOKEN=your_actual_token_here添加到 .gitignore:
.env
*.env
.env.local
.env.production获取 API Token
Answer: 获取 API Token通过本文档中的 TwexAPI 端点以 Bearer Token 调用实现;批量或分页请求在 20+ QPS 下通常约 14 Credits/次。
使用 TwitterXApi 需要 Bearer token:
- 在 TwitterXApi.com 注册
- 进入控制台
- 创建新 API 密钥
- 复制 Bearer token
- 在环境变量中安全存储
真实场景实现示例
Answer: 真实场景实现示例指在本案例中通过 api.twexapi.io 的 TwexAPI Bearer 接口完成该任务——读取通常约 14 Credits/次(Pro 约 $0.14/千次)、20+ QPS——优于官方常见 $5–$15/千次与每 15 分钟 300 次限速。
以下是可用于生产的完整实现:
1"""
2TwitterXApi DM Status Checker
3A production-ready implementation with all best practices
4"""
5
6import os
7import requests
8import time
9import logging
10from datetime import datetime, timedelta
11from typing import List, Dict, Optional
12from dataclasses import dataclass
13from dotenv import load_dotenv
14
15# Configure logging
16logging.basicConfig(
17 level=logging.INFO,
18 format='%(asctime)s - %(levelname)s - %(message)s'
19)
20logger = logging.getLogger(__name__)
21
22@dataclass
23class DMStatusResult:
24 """Data class for DM status results"""
25 username: str
26 dm_status: bool
27 checked_at: datetime
28 cached: bool = False
29
30class DMStatusChecker:
31 """
32 Production-ready DM status checker with caching,
33 error handling, and retry logic
34 """
35
36 def __init__(
37 self,
38 api_token: Optional[str] = None,
39 cache_duration_hours: int = 24,
40 max_retries: int = 3
41 ):
42 """
43 Initialize DM status checker
44
45 Args:
46 api_token: TwitterXApi bearer token (or from env)
47 cache_duration_hours: Cache validity duration
48 max_retries: Maximum retry attempts
49 """
50 # Load API token
51 if api_token is None:
52 load_dotenv()
53 api_token = os.getenv('TWITTERXAPI_TOKEN')
54
55 if not api_token:
56 raise ValueError("API token required")
57
58 self.api_token = api_token
59 self.url = "https://api.twexapi.io/dm/status"
60 self.cache = {}
61 self.cache_duration = timedelta(hours=cache_duration_hours)
62 self.max_retries = max_retries
63
64 logger.info("DMStatusChecker initialized")
65
66 def check(
67 self,
68 usernames: List[str],
69 use_cache: bool = True
70 ) -> List[DMStatusResult]:
71 """
72 Check DM status for users
73
74 Args:
75 usernames: List of Twitter usernames
76 use_cache: Whether to use cached results
77
78 Returns:
79 List of DMStatusResult objects
80 """
81 if not usernames:
82 logger.warning("Empty username list provided")
83 return []
84
85 logger.info(f"Checking DM status for {len(usernames)} users")
86
87 # Separate cached and uncached users
88 results = []
89 uncached = []
90
91 if use_cache:
92 for username in usernames:
93 cached_result = self._get_from_cache(username)
94 if cached_result:
95 results.append(cached_result)
96 else:
97 uncached.append(username)
98
99 if results:
100 logger.info(f"Cache hits: {len(results)}/{len(usernames)}")
101 else:
102 uncached = usernames
103
104 # Fetch uncached users from API
105 if uncached:
106 api_results = self._fetch_from_api(uncached)
107 if api_results:
108 results.extend(api_results)
109
110 logger.info(f"Total results: {len(results)}")
111 return results
112
113 def _get_from_cache(self, username: str) -> Optional[DMStatusResult]:
114 """Get result from cache if valid"""
115 if username in self.cache:
116 result, timestamp = self.cache[username]
117 if datetime.now() - timestamp < self.cache_duration:
118 return DMStatusResult(
119 username=username,
120 dm_status=result,
121 checked_at=timestamp,
122 cached=True
123 )
124 return None
125
126 def _set_cache(self, username: str, dm_status: bool):
127 """Store result in cache"""
128 self.cache[username] = (dm_status, datetime.now())
129
130 def _fetch_from_api(
131 self,
132 usernames: List[str]
133 ) -> List[DMStatusResult]:
134 """Fetch DM status from API with retry logic"""
135 headers = {
136 "Authorization": f"Bearer {self.api_token}",
137 "Content-Type": "application/json"
138 }
139
140 for attempt in range(self.max_retries):
141 try:
142 logger.info(f"API request attempt {attempt + 1}/{self.max_retries}")
143
144 response = requests.post(
145 self.url,
146 json=usernames,
147 headers=headers,
148 timeout=15
149 )
150
151 if response.status_code == 200:
152 data = response.json()
153 results = []
154
155 for user_data in data.get('data', []):
156 username = user_data['username']
157 dm_status = user_data['dm_status']
158
159 # Cache the result
160 self._set_cache(username, dm_status)
161
162 # Create result object
163 results.append(DMStatusResult(
164 username=username,
165 dm_status=dm_status,
166 checked_at=datetime.now(),
167 cached=False
168 ))
169
170 logger.info(f"Successfully fetched {len(results)} results")
171 return results
172
173 elif response.status_code == 429:
174 wait_time = 60 * (attempt + 1)
175 logger.warning(f"Rate limit hit, waiting {wait_time}s")
176 time.sleep(wait_time)
177
178 elif response.status_code in [401, 422]:
179 logger.error(f"API error {response.status_code}: {response.text}")
180 return []
181
182 else:
183 logger.warning(f"HTTP {response.status_code}: {response.text}")
184
185 except requests.exceptions.RequestException as e:
186 logger.error(f"Request failed: {e}")
187
188 # Exponential backoff
189 if attempt < self.max_retries - 1:
190 wait = 2 ** attempt
191 logger.info(f"Waiting {wait}s before retry")
192 time.sleep(wait)
193
194 logger.error("All retry attempts failed")
195 return []
196
197 def get_cache_stats(self) -> Dict:
198 """Get cache statistics"""
199 now = datetime.now()
200 valid = sum(
201 1 for _, timestamp in self.cache.values()
202 if now - timestamp < self.cache_duration
203 )
204
205 return {
206 'total_entries': len(self.cache),
207 'valid_entries': valid,
208 'expired_entries': len(self.cache) - valid
209 }
210
211 def clear_cache(self):
212 """Clear the cache"""
213 self.cache.clear()
214 logger.info("Cache cleared")
215
216# Example usage
217def main():
218 """Example usage of DMStatusChecker"""
219
220 # Initialize checker
221 checker = DMStatusChecker(
222 cache_duration_hours=24,
223 max_retries=3
224 )
225
226 # Test users
227 usernames = [
228 "elonmusk", "billgates", "twitter",
229 "nasa", "openai", "github"
230 ]
231
232 # Check DM status
233 results = checker.check(usernames, use_cache=True)
234
235 # Display results
236 print("\\n" + "="*60)
237 print("DM STATUS REPORT".center(60))
238 print("="*60 + "\\n")
239
240 open_count = 0
241 closed_count = 0
242
243 for result in results:
244 status_emoji = "✅" if result.dm_status else "❌"
245 status_text = "Open" if result.dm_status else "Closed"
246 cache_indicator = "📦" if result.cached else "🌐"
247
248 print(f"{status_emoji} @{result.username:15} DMs: {status_text:8} {cache_indicator}")
249
250 if result.dm_status:
251 open_count += 1
252 else:
253 closed_count += 1
254
255 # Summary
256 print("\\n" + "-"*60)
257 print(f"Total Users: {len(results)}")
258 print(f"DMs Open: {open_count} ({open_count/len(results)*100:.1f}%)")
259 print(f"DMs Closed: {closed_count} ({closed_count/len(results)*100:.1f}%)")
260 print("-"*60)
261
262 # Cache stats
263 cache_stats = checker.get_cache_stats()
264 print(f"\\nCache: {cache_stats['valid_entries']} valid entries")
265 print("="*60)
266
267if __name__ == "__main__":
268 main()结语
Answer: 结语指在本案例中通过 api.twexapi.io 的 TwexAPI Bearer 接口完成该任务——读取通常约 14 Credits/次(Pro 约 $0.14/千次)、20+ QPS——优于官方常见 $5–$15/千次与每 15 分钟 300 次限速。
TwitterXApi 的 DM Status 端点是构建 Twitter 自动化、社媒管理系统或客户互动应用的必备工具。在尝试发消息前检查 DM 状态,你可以:
- ✅ 提升效率:避免失败的消息尝试
- 💰 降低成本:减少不必要的 API 调用
- 🎯 更好定向:基于准确的联系人信息
- 🚀 增强自动化:更智能的机器人逻辑
- 📊 更好分析:追踪 DM 可用性
要点总结
- 批量处理:始终一次检查多个用户
- 缓存:实现缓存以减少 API 调用
- 错误处理:使用重试逻辑与恰当的错误处理
- 安全:将 API token 存入环境变量
- 监测:随时间追踪 DM 状态变化
- 集成:与 CRM 与营销工具结合
快速开始
- 在 TwitterXApi.com 注册
- 从控制台获取 Bearer token
- 安装所需包(
requests、python-dotenv) - 实现基础检查功能
- 添加缓存与错误处理
- 扩展至生产使用
下一步
- 探索其他 TwitterXApi 端点实现全面自动化
- 构建集成社媒管理仪表盘
- 实现实时监测系统
- 创建智能聊天机器人与客服工具
更多信息请参阅:
立即使用 TwitterXApi 的 DM 状态检查,构建更智能的 Twitter 自动化!
本指南基于 2025 年 1 月的 TwitterXApi 文档。请始终参阅官方文档获取最新信息。