Twitter DM ステータスの確認方法:TwitterXApi 完全ガイド
TwexAPI は、ソーシャルインテリジェンス分析向けのエンタープライズ級インターフェースです。単一リクエストで最大 100,000 件の深層 X/Twitter エンティティを並行解析できます。グローバル平均レイテンシは 800ms 未満、99.9% の稼働 SLA を備え、従来のエンタープライズ代替と比べ最大 96% のデータ取得コストを削減します。大規模集約時のレート制限を回避する、グローバル分散のレジデンシャルプロキシクラスタ上で動作します。
Quick Answer
Twitter/X の DM 状態確認は、送信前に TwexAPI の一括 DM 可否 API で対象ユーザーがメッセージを受け付けるか(公開・フォロワーのみ・不可)を判定することです。1 リクエストで複数 ID を評価。読み取りは約 14 Credits/回。公式 DM は 1K あたり約 $15 と制限が厳しい一方、TwexAPI は 20+ QPS で outreach 自動化に向きます。
FAQ
送信前に DM 状態を確認する理由は?
クレジット浪費と UX 低下を防ぎます。DM 不可のユーザーには送れないため、事前チェックで到達可能な連絡先だけに絞れます。
この用途で公式 X API ではなく TwexAPI を使う理由は?
公式 X API は 1K 読み取り $5〜$15、15 分 300 リクエストなどの制限が一般的です。TwexAPI Pro(月 $99)は約 1,100 万 Credits、14 Credits/回で約 $0.14/1K、20+ QPS、平均 800ms 未満。新規 20,000 無料 Credits(カード不要)、約 1,400 回の読み取り。DM 状態確認 では Bearer Token で同等データを取得でき、https://docs.twitterxapi.com を参照。
TwexAPI でこのワークフローのコストは?
読み取りは多く 14 Credits/回。Pro(月 $99、約 1,100 万 Credits)で約 $0.14/1K(公式 $5+/1K より約 95% 安)。月 1 万回で約 14 万 Credits。試作は Mini $20(200 万 Credits)。https://twexapi.io/pricing
なぜ DM ステータスを確認するのか
Answer: なぜ DM ステータスを確認するのかの要点はコストとスループットです。TwexAPI Pro は約 $0.14/1K・20+ QPS に対し、公式 Enterprise は同規模の X データ で月 $5,000 超えが一般的です。
ユーザーが DM を有効にしているか把握することは、次の理由で重要です:
- 🎯 送信失敗を回避:DM を受け取れないユーザーへの無駄な API 呼び出しを防ぐ
- ✅ UX 向上:アプリで正確なメッセージオプションを提供
- 📊 連絡先フィルタ:実際に到達可能なユーザーリストを構築
- 🤖 スマート自動化:到達可能なユーザーにのみ送るボット
- 💰 コスト削減:不要な API 呼び出しを減らし効率化
API エンドポイント概要
Answer: API エンドポイント概要は本ガイドの TwexAPI エンドポイントを Bearer で呼び出して実装します。バッチ/ページングで約 14 Credits/回・20+ QPS です。
TwitterXApi の DM Status エンドポイントは、1 リクエストで複数ユーザーの DM ステータスを確認できます:
POST https://api.twexapi.io/dm/status
Authorization: Bearer <token>
Content-Type: application/json主な機能
- バッチ処理:1 リクエストで複数ユーザー
- シンプルな認証:Bearer token のみ
- 高速応答:ミリ秒で結果取得
- 正確なデータ:リアルタイム DM ステータス
- コスト効率:一括確認で API 呼び出し削減
リクエストパラメータ
公式ドキュメントによると:
Headers:
Authorization(string、必須):Bearer <token>形式の Bearer tokenContent-Type(string、必須):application/jsonであること
Body:
- ユーザー名の JSON 配列(文字列)
- 例:
["elonmusk", "axiaisacat"]
レスポンス構造
成功レスポンス(HTTP 200)は次を返します:
1{
2 "code": 200,
3 "msg": "success",
4 "data": [
5 {
6 "username": "elonmusk",
7 "dm_status": true
8 },
9 {
10 "username": "axiaisacat",
11 "dm_status": false
12 }
13 ]
14}レスポンスフィールド:
code(integer):HTTP ステータス(200 は成功)msg(string):レスポンスメッセージdata(array):DM ステータスオブジェクトの配列username(string):Twitter ユーザー名dm_status(boolean):DM 開放はtrue、閉鎖はfalse
基本実装例
Answer: 基本実装例は本ガイドの TwexAPI エンドポイントを Bearer で呼び出して実装します。バッチ/ページングで約 14 Credits/回・20+ QPS です。
まず、異なる言語で DM ステータスを確認するシンプルな例から始めます。
例 1:Python 実装
エラーハンドリング付きの完全な Python 実装:
1import requests
2from typing import List, Dict, Optional
3
4def check_dm_status(usernames: List[str], api_token: str) -> Optional[Dict]:
5 """
6 Check DM status for multiple Twitter users
7
8 Args:
9 usernames: List of Twitter usernames to check
10 api_token: Your TwitterXApi bearer token
11
12 Returns:
13 Dictionary containing DM status results, or None if failed
14 """
15 url = "https://api.twexapi.io/dm/status"
16
17 headers = {
18 "Authorization": f"Bearer {api_token}",
19 "Content-Type": "application/json"
20 }
21
22 try:
23 response = requests.post(url, json=usernames, headers=headers, timeout=10)
24
25 if response.status_code == 200:
26 result = response.json()
27 print(f"✅ Successfully checked {len(result['data'])} users")
28 return result
29 elif response.status_code == 401:
30 print("❌ Authentication failed: Check your API token")
31 elif response.status_code == 422:
32 print("❌ Invalid request: Check username format")
33 else:
34 print(f"❌ Request failed: HTTP {response.status_code}")
35 print(response.text)
36
37 return None
38
39 except requests.exceptions.Timeout:
40 print("❌ Request timeout: Check your network connection")
41 return None
42 except requests.exceptions.RequestException as e:
43 print(f"❌ Request error: {str(e)}")
44 return None
45
46# Example usage
47def main():
48 # Replace with your actual API token
49 API_TOKEN = "YOUR_API_TOKEN_HERE"
50
51 # Users to check
52 usernames = ["elonmusk", "axiaisacat", "twitter"]
53
54 # Check DM status
55 result = check_dm_status(usernames, API_TOKEN)
56
57 if result:
58 print(f"\\n📊 DM Status Report:")
59 print(f"Status: {result['msg']}")
60 print(f"Total users checked: {len(result['data'])}\\n")
61
62 # Display results
63 for user in result['data']:
64 username = user['username']
65 status = user['dm_status']
66 status_icon = "✅" if status else "❌"
67 status_text = "Open" if status else "Closed"
68
69 print(f"{status_icon} @{username}: DMs {status_text}")
70
71if __name__ == "__main__":
72 main()期待される出力:
✅ Successfully checked 3 users
📊 DM Status Report:
Status: success
Total users checked: 3
✅ @elonmusk: DMs Open
❌ @axiaisacat: DMs Closed
✅ @twitter: DMs Open例 2:JavaScript/Node.js 実装
1const axios = require('axios');
2
3/**
4 * Check DM status for multiple Twitter users
5 * @param {string[]} usernames - Array of Twitter usernames
6 * @param {string} apiToken - Your TwitterXApi bearer token
7 * @returns {Promise<Object|null>} DM status results
8 */
9async function checkDMStatus(usernames, apiToken) {
10 const url = 'https://api.twexapi.io/dm/status';
11
12 const headers = {
13 'Authorization': \`Bearer \${apiToken}\`,
14 'Content-Type': 'application/json'
15 };
16
17 try {
18 const response = await axios.post(url, usernames, {
19 headers,
20 timeout: 10000
21 });
22
23 console.log(\`✅ Successfully checked \${response.data.data.length} users\`);
24 return response.data;
25
26 } catch (error) {
27 if (error.response) {
28 // Server responded with error status
29 switch (error.response.status) {
30 case 401:
31 console.error('❌ Authentication failed: Check your API token');
32 break;
33 case 422:
34 console.error('❌ Invalid request: Check username format');
35 break;
36 default:
37 console.error(\`❌ Request failed: HTTP \${error.response.status}\`);
38 console.error(error.response.data);
39 }
40 } else if (error.request) {
41 console.error('❌ No response received: Check your network connection');
42 } else {
43 console.error(\`❌ Error: \${error.message}\`);
44 }
45 return null;
46 }
47}
48
49// Example usage
50async function main() {
51 // Replace with your actual API token
52 const API_TOKEN = 'YOUR_API_TOKEN_HERE';
53
54 // Users to check
55 const usernames = ['elonmusk', 'axiaisacat', 'twitter'];
56
57 // Check DM status
58 const result = await checkDMStatus(usernames, API_TOKEN);
59
60 if (result && result.data) {
61 console.log('\\n📊 DM Status Report:');
62 console.log(\`Status: \${result.msg}\`);
63 console.log(\`Total users checked: \${result.data.length}\\n\`);
64
65 // Display results
66 result.data.forEach(user => {
67 const statusIcon = user.dm_status ? '✅' : '❌';
68 const statusText = user.dm_status ? 'Open' : 'Closed';
69 console.log(\`\${statusIcon} @\${user.username}: DMs \${statusText}\`);
70 });
71 }
72}
73
74// Run the example
75main().catch(console.error);例 3:cURL コマンド
コマンドラインからのクイックテスト用:
curl --request POST \\
--url https://api.twexapi.io/dm/status \\
--header 'Authorization: Bearer YOUR_API_TOKEN_HERE' \\
--header 'Content-Type: application/json' \\
--data '["elonmusk", "axiaisacat", "twitter"]'高度なユースケース
Answer: 高度なユースケースとは、この事例で api.twexapi.io の TwexAPI Bearer API を使う手順を指します(読み取り約 14 Credits/回、Pro で約 $0.14/1K、20+ QPS)。公式の $5〜$15/1K や 15 分 300 回制限より運用しやすいです。
次に、DM Status API のより高度な応用を見ていきます。
ユースケース 1:一括 DM キャンペーンの事前フィルタ
一括 DM 送信前に連絡先リストをフィルタ:
1import requests
2from typing import List, Dict
3
4class DMCampaignManager:
5 def __init__(self, api_token: str):
6 self.api_token = api_token
7 self.dm_status_url = "https://api.twexapi.io/dm/status"
8
9 def get_dm_available_users(self, usernames: List[str]) -> List[str]:
10 """
11 Filter users who have DMs enabled
12
13 Args:
14 usernames: List of usernames to check
15
16 Returns:
17 List of usernames with DMs open
18 """
19 headers = {
20 "Authorization": f"Bearer {self.api_token}",
21 "Content-Type": "application/json"
22 }
23
24 try:
25 response = requests.post(
26 self.dm_status_url,
27 json=usernames,
28 headers=headers,
29 timeout=10
30 )
31
32 if response.status_code == 200:
33 result = response.json()
34
35 # Filter users with DMs open
36 available_users = [
37 user['username']
38 for user in result['data']
39 if user['dm_status']
40 ]
41
42 total = len(result['data'])
43 available = len(available_users)
44 closed = total - available
45
46 print(f"📊 DM Availability Summary:")
47 print(f" Total users: {total}")
48 print(f" ✅ DMs Open: {available} ({available/total*100:.1f}%)")
49 print(f" ❌ DMs Closed: {closed} ({closed/total*100:.1f}%)")
50
51 return available_users
52 else:
53 print(f"❌ API Error: {response.status_code}")
54 return []
55
56 except Exception as e:
57 print(f"❌ Error: {e}")
58 return []
59
60 def prepare_dm_campaign(
61 self,
62 target_users: List[str],
63 message_template: str
64 ) -> Dict:
65 """
66 Prepare a DM campaign by filtering available users
67
68 Args:
69 target_users: List of target usernames
70 message_template: DM message template
71
72 Returns:
73 Campaign preparation results
74 """
75 print(f"🚀 Preparing DM campaign for {len(target_users)} users...\\n")
76
77 # Check DM status
78 available_users = self.get_dm_available_users(target_users)
79
80 if not available_users:
81 print("\\n⚠️ No users available for messaging")
82 return {'success': False, 'available_users': []}
83
84 print(f"\\n✅ Campaign ready!")
85 print(f"Ready to send to: {', '.join(available_users[:5])}", end='')
86 if len(available_users) > 5:
87 print(f" and {len(available_users) - 5} more")
88 else:
89 print()
90
91 return {
92 'success': True,
93 'available_users': available_users,
94 'unavailable_count': len(target_users) - len(available_users),
95 'message': message_template
96 }
97
98# Example usage
99def main():
100 API_TOKEN = "YOUR_API_TOKEN_HERE"
101
102 # Initialize campaign manager
103 manager = DMCampaignManager(API_TOKEN)
104
105 # Target audience
106 target_audience = [
107 "elonmusk", "billgates", "twitter", "nasa",
108 "axiaisacat", "openai", "github", "vercel"
109 ]
110
111 # Message template
112 message = "Hi! We'd love to connect with you about our new product launch. 🚀"
113
114 # Prepare campaign
115 campaign = manager.prepare_dm_campaign(target_audience, message)
116
117 if campaign['success']:
118 print(f"\\n📤 You can now send {len(campaign['available_users'])} DMs")
119 print(f"💰 Saved {campaign['unavailable_count']} unnecessary API calls")
120
121if __name__ == "__main__":
122 main()ユースケース 2:CRM 統合と DM ステータス追跡
CRM システムに DM ステータス確認を統合:
1import requests
2import json
3from datetime import datetime
4from typing import List, Dict
5
6class CRMDMTracker:
7 def __init__(self, api_token: str):
8 self.api_token = api_token
9 self.dm_status_url = "https://api.twexapi.io/dm/status"
10 self.customer_db = {} # Simulated database
11
12 def update_customer_dm_status(self, customers: List[Dict]) -> Dict:
13 """
14 Update DM status for all customers in CRM
15
16 Args:
17 customers: List of customer dictionaries with 'username' field
18
19 Returns:
20 Update summary
21 """
22 usernames = [c['username'] for c in customers if c.get('username')]
23
24 if not usernames:
25 return {'success': False, 'message': 'No usernames found'}
26
27 print(f"🔄 Updating DM status for {len(usernames)} customers...")
28
29 headers = {
30 "Authorization": f"Bearer {self.api_token}",
31 "Content-Type": "application/json"
32 }
33
34 try:
35 response = requests.post(
36 self.dm_status_url,
37 json=usernames,
38 headers=headers,
39 timeout=15
40 )
41
42 if response.status_code == 200:
43 result = response.json()
44 updated_count = 0
45
46 # Update each customer record
47 for status_data in result['data']:
48 username = status_data['username']
49 dm_status = status_data['dm_status']
50
51 # Update in "database"
52 if username in self.customer_db:
53 self.customer_db[username].update({
54 'dm_status': dm_status,
55 'dm_status_checked_at': datetime.now().isoformat(),
56 'can_contact_via_dm': dm_status
57 })
58 updated_count += 1
59
60 print(f"✅ Updated {updated_count} customer records")
61
62 return {
63 'success': True,
64 'updated_count': updated_count,
65 'timestamp': datetime.now().isoformat()
66 }
67 else:
68 print(f"❌ API Error: {response.status_code}")
69 return {'success': False, 'error': response.text}
70
71 except Exception as e:
72 print(f"❌ Error: {e}")
73 return {'success': False, 'error': str(e)}
74
75 def get_contactable_customers(self, segment: str = 'all') -> List[Dict]:
76 """
77 Get list of customers who can be contacted via DM
78
79 Args:
80 segment: Customer segment to filter
81
82 Returns:
83 List of contactable customers
84 """
85 contactable = [
86 customer
87 for customer in self.customer_db.values()
88 if customer.get('can_contact_via_dm', False)
89 ]
90
91 print(f"\\n📋 Contactable Customers Report:")
92 print(f"Total in database: {len(self.customer_db)}")
93 print(f"Contactable via DM: {len(contactable)}")
94 print(f"Contact rate: {len(contactable)/len(self.customer_db)*100:.1f}%")
95
96 return contactable
97
98 def add_customer(self, username: str, **kwargs):
99 """Add customer to CRM database"""
100 self.customer_db[username] = {
101 'username': username,
102 'added_at': datetime.now().isoformat(),
103 **kwargs
104 }
105
106# Example usage
107def main():
108 API_TOKEN = "YOUR_API_TOKEN_HERE"
109
110 # Initialize CRM tracker
111 crm = CRMDMTracker(API_TOKEN)
112
113 # Add some customers to CRM
114 customers = [
115 {'username': 'elonmusk', 'name': 'Elon Musk', 'tier': 'premium'},
116 {'username': 'billgates', 'name': 'Bill Gates', 'tier': 'premium'},
117 {'username': 'axiaisacat', 'name': 'Axia', 'tier': 'standard'},
118 {'username': 'twitter', 'name': 'Twitter', 'tier': 'standard'},
119 ]
120
121 for customer in customers:
122 crm.add_customer(**customer)
123
124 # Update DM status for all customers
125 result = crm.update_customer_dm_status(customers)
126
127 if result['success']:
128 # Get contactable customers
129 contactable = crm.get_contactable_customers()
130
131 print("\\n👥 Customers you can message:")
132 for customer in contactable:
133 print(f" ✅ @{customer['username']} ({customer.get('tier', 'N/A')})")
134
135if __name__ == "__main__":
136 main()ユースケース 3:リアルタイム DM ステータス監視
時間経過に伴う DM ステータス変化を監視:
1import requests
2import time
3from datetime import datetime
4from typing import List, Dict, Set
5
6class DMStatusMonitor:
7 def __init__(self, api_token: str):
8 self.api_token = api_token
9 self.dm_status_url = "https://api.twexapi.io/dm/status"
10 self.status_history = {} # Track status changes
11
12 def check_status_changes(
13 self,
14 usernames: List[str]
15 ) -> Dict[str, Dict]:
16 """
17 Check for DM status changes
18
19 Args:
20 usernames: List of usernames to monitor
21
22 Returns:
23 Dictionary of status changes
24 """
25 headers = {
26 "Authorization": f"Bearer {self.api_token}",
27 "Content-Type": "application/json"
28 }
29
30 try:
31 response = requests.post(
32 self.dm_status_url,
33 json=usernames,
34 headers=headers,
35 timeout=10
36 )
37
38 if response.status_code == 200:
39 result = response.json()
40 changes = {}
41
42 for user_data in result['data']:
43 username = user_data['username']
44 current_status = user_data['dm_status']
45
46 # Check if status changed
47 if username in self.status_history:
48 previous_status = self.status_history[username]['status']
49
50 if previous_status != current_status:
51 change_type = 'opened' if current_status else 'closed'
52 changes[username] = {
53 'previous': previous_status,
54 'current': current_status,
55 'change_type': change_type,
56 'timestamp': datetime.now().isoformat()
57 }
58
59 print(f"🔔 ALERT: @{username} has {change_type} their DMs!")
60
61 # Update history
62 self.status_history[username] = {
63 'status': current_status,
64 'checked_at': datetime.now().isoformat()
65 }
66
67 return changes
68 else:
69 print(f"❌ API Error: {response.status_code}")
70 return {}
71
72 except Exception as e:
73 print(f"❌ Error: {e}")
74 return {}
75
76 def monitor_continuously(
77 self,
78 usernames: List[str],
79 interval_minutes: int = 5,
80 duration_hours: int = 1
81 ):
82 """
83 Continuously monitor DM status changes
84
85 Args:
86 usernames: Users to monitor
87 interval_minutes: Check interval in minutes
88 duration_hours: How long to monitor
89 """
90 print(f"🔍 Starting DM status monitoring...")
91 print(f"Monitoring {len(usernames)} users")
92 print(f"Check interval: {interval_minutes} minutes")
93 print(f"Duration: {duration_hours} hours")
94 print(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\\n")
95
96 checks = 0
97 total_checks = (duration_hours * 60) // interval_minutes
98
99 while checks < total_checks:
100 checks += 1
101 print(f"📊 Check #{checks}/{total_checks} at {datetime.now().strftime('%H:%M:%S')}")
102
103 changes = self.check_status_changes(usernames)
104
105 if changes:
106 print(f"⚠️ Detected {len(changes)} status change(s)!")
107 for username, change in changes.items():
108 print(f" • @{username}: {change['change_type']}")
109 else:
110 print(" No changes detected")
111
112 if checks < total_checks:
113 print(f"💤 Sleeping for {interval_minutes} minutes...\\n")
114 time.sleep(interval_minutes * 60)
115
116 print(f"\\n✅ Monitoring completed after {checks} checks")
117 self.print_summary()
118
119 def print_summary(self):
120 """Print monitoring summary"""
121 print("\\n📈 Monitoring Summary:")
122 print(f"Users tracked: {len(self.status_history)}")
123
124 open_dms = sum(
125 1 for data in self.status_history.values()
126 if data['status']
127 )
128 closed_dms = len(self.status_history) - open_dms
129
130 print(f"Currently open: {open_dms}")
131 print(f"Currently closed: {closed_dms}")
132
133# Example usage
134def main():
135 API_TOKEN = "YOUR_API_TOKEN_HERE"
136
137 # Initialize monitor
138 monitor = DMStatusMonitor(API_TOKEN)
139
140 # Users to monitor
141 vip_users = [
142 "elonmusk", "billgates", "twitter",
143 "nasa", "openai", "github"
144 ]
145
146 # Option 1: Single check
147 print("=== Single Status Check ===")
148 changes = monitor.check_status_changes(vip_users)
149
150 # Option 2: Continuous monitoring (commented out for example)
151 # monitor.monitor_continuously(
152 # vip_users,
153 # interval_minutes=5,
154 # duration_hours=1
155 # )
156
157if __name__ == "__main__":
158 main()ベストプラクティス
Answer: ベストプラクティスとは、この事例で api.twexapi.io の TwexAPI Bearer API を使う手順を指します(読み取り約 14 Credits/回、Pro で約 $0.14/1K、20+ QPS)。公式の $5〜$15/1K や 15 分 300 回制限より運用しやすいです。
1. 効率のためのバッチ処理
API 呼び出しを最小化するため、常に 1 リクエストで複数ユーザーを確認:
1def check_dm_status_in_batches(
2 all_usernames: List[str],
3 api_token: str,
4 batch_size: int = 50
5) -> List[Dict]:
6 """
7 Check DM status in batches for large user lists
8
9 Args:
10 all_usernames: Complete list of usernames
11 api_token: API bearer token
12 batch_size: Number of users per batch
13
14 Returns:
15 Combined results from all batches
16 """
17 all_results = []
18 total_batches = (len(all_usernames) + batch_size - 1) // batch_size
19
20 print(f"📦 Processing {len(all_usernames)} users in {total_batches} batches")
21
22 for i in range(0, len(all_usernames), batch_size):
23 batch = all_usernames[i:i + batch_size]
24 batch_num = (i // batch_size) + 1
25
26 print(f" Batch {batch_num}/{total_batches}: {len(batch)} users", end='')
27
28 result = check_dm_status(batch, api_token)
29
30 if result and 'data' in result:
31 all_results.extend(result['data'])
32 print(" ✅")
33 else:
34 print(" ❌")
35
36 # Rate limiting: wait between batches
37 if i + batch_size < len(all_usernames):
38 time.sleep(1)
39
40 print(f"\\n✅ Processed {len(all_results)} users total")
41 return all_results
42
43# Example: Check 150 users
44large_user_list = [f"user{i}" for i in range(1, 151)]
45results = check_dm_status_in_batches(large_user_list, API_TOKEN, batch_size=50)2. キャッシュで API 呼び出し削減
重複チェックを避けるキャッシュを実装:
1from datetime import datetime, timedelta
2from typing import Optional
3
4class DMStatusCache:
5 def __init__(self, cache_duration_hours: int = 24):
6 """
7 Initialize cache with configurable duration
8
9 Args:
10 cache_duration_hours: How long to cache results
11 """
12 self.cache = {}
13 self.cache_duration = timedelta(hours=cache_duration_hours)
14
15 def get(self, username: str) -> Optional[bool]:
16 """Get cached DM status if still valid"""
17 if username in self.cache:
18 status, timestamp = self.cache[username]
19 if datetime.now() - timestamp < self.cache_duration:
20 return status
21 return None
22
23 def set(self, username: str, dm_status: bool):
24 """Cache DM status with timestamp"""
25 self.cache[username] = (dm_status, datetime.now())
26
27 def invalidate(self, username: str):
28 """Invalidate cache for specific user"""
29 if username in self.cache:
30 del self.cache[username]
31
32 def clear(self):
33 """Clear entire cache"""
34 self.cache.clear()
35
36 def get_stats(self) -> Dict:
37 """Get cache statistics"""
38 now = datetime.now()
39 valid_entries = sum(
40 1 for _, timestamp in self.cache.values()
41 if now - timestamp < self.cache_duration
42 )
43
44 return {
45 'total_entries': len(self.cache),
46 'valid_entries': valid_entries,
47 'expired_entries': len(self.cache) - valid_entries
48 }
49
50# Usage with cache
51cache = DMStatusCache(cache_duration_hours=24)
52
53def check_dm_status_with_cache(
54 usernames: List[str],
55 api_token: str,
56 use_cache: bool = True
57) -> List[Dict]:
58 """Check DM status with caching support"""
59
60 uncached_users = []
61 results = []
62 cache_hits = 0
63
64 # Check cache first
65 if use_cache:
66 for username in usernames:
67 cached_status = cache.get(username)
68 if cached_status is not None:
69 results.append({
70 'username': username,
71 'dm_status': cached_status,
72 'cached': True
73 })
74 cache_hits += 1
75 else:
76 uncached_users.append(username)
77 else:
78 uncached_users = usernames
79
80 # Fetch uncached users from API
81 if uncached_users:
82 print(f"🔍 Checking {len(uncached_users)} users via API...")
83 api_result = check_dm_status(uncached_users, api_token)
84
85 if api_result and 'data' in api_result:
86 for user_data in api_result['data']:
87 # Cache the result
88 cache.set(user_data['username'], user_data['dm_status'])
89 user_data['cached'] = False
90 results.append(user_data)
91
92 if use_cache and cache_hits > 0:
93 print(f"⚡ Cache hits: {cache_hits}/{len(usernames)} ({cache_hits/len(usernames)*100:.1f}%)")
94 print(f"💰 Saved {cache_hits} API calls")
95
96 return results
97
98# Example usage
99usernames = ["elonmusk", "billgates", "twitter"]
100
101# First call - fetches from API
102print("=== First Check ===")
103results1 = check_dm_status_with_cache(usernames, API_TOKEN)
104
105# Second call - uses cache
106print("\\n=== Second Check (cached) ===")
107results2 = check_dm_status_with_cache(usernames, API_TOKEN)
108
109# Check cache stats
110stats = cache.get_stats()
111print(f"\\n📊 Cache Stats: {stats}")3. エラーハンドリングとリトライ
堅牢なエラーハンドリングを実装:
1import time
2from typing import Optional, Dict, List
3
4def check_dm_status_with_retry(
5 usernames: List[str],
6 api_token: str,
7 max_retries: int = 3,
8 retry_delay: int = 2
9) -> Optional[Dict]:
10 """
11 Check DM status with automatic retry on failure
12
13 Args:
14 usernames: List of usernames to check
15 api_token: API bearer token
16 max_retries: Maximum number of retry attempts
17 retry_delay: Delay between retries in seconds
18
19 Returns:
20 API response or None if all retries failed
21 """
22 url = "https://api.twexapi.io/dm/status"
23 headers = {
24 "Authorization": f"Bearer {api_token}",
25 "Content-Type": "application/json"
26 }
27
28 for attempt in range(max_retries):
29 try:
30 print(f"🔄 Attempt {attempt + 1}/{max_retries}...")
31
32 response = requests.post(
33 url,
34 json=usernames,
35 headers=headers,
36 timeout=15
37 )
38
39 if response.status_code == 200:
40 print(f"✅ Success on attempt {attempt + 1}")
41 return response.json()
42
43 elif response.status_code == 429:
44 # Rate limit - wait longer
45 wait_time = retry_delay * (attempt + 1) * 2
46 print(f"⏱️ Rate limit hit, waiting {wait_time}s...")
47 time.sleep(wait_time)
48 continue
49
50 elif response.status_code == 401:
51 # Auth error - don't retry
52 print("❌ Authentication error: Check your API token")
53 return None
54
55 elif response.status_code == 422:
56 # Validation error - don't retry
57 print("❌ Validation error: Check username format")
58 print(response.text)
59 return None
60
61 else:
62 print(f"⚠️ HTTP {response.status_code}: {response.text}")
63
64 except requests.exceptions.Timeout:
65 print(f"⏱️ Request timeout on attempt {attempt + 1}")
66
67 except requests.exceptions.ConnectionError:
68 print(f"🔌 Connection error on attempt {attempt + 1}")
69
70 except requests.exceptions.RequestException as e:
71 print(f"❌ Request error: {e}")
72
73 except Exception as e:
74 print(f"❌ Unexpected error: {e}")
75 return None
76
77 # Wait before retry (except on last attempt)
78 if attempt < max_retries - 1:
79 wait = retry_delay * (2 ** attempt) # Exponential backoff
80 print(f"💤 Waiting {wait}s before retry...")
81 time.sleep(wait)
82
83 print(f"❌ All {max_retries} attempts failed")
84 return None
85
86# Example usage with retry
87usernames = ["elonmusk", "billgates", "twitter"]
88result = check_dm_status_with_retry(
89 usernames,
90 API_TOKEN,
91 max_retries=3,
92 retry_delay=2
93)セキュリティのベストプラクティス
Answer: セキュリティのベストプラクティスとは、この事例で api.twexapi.io の TwexAPI Bearer API を使う手順を指します(読み取り約 14 Credits/回、Pro で約 $0.14/1K、20+ QPS)。公式の $5〜$15/1K や 15 分 300 回制限より運用しやすいです。
API トークンの保護
1import os
2from dotenv import load_dotenv
3
4# Load environment variables from .env file
5load_dotenv()
6
7# Get API token from environment variable
8API_TOKEN = os.getenv('TWITTERXAPI_TOKEN')
9
10if not API_TOKEN:
11 raise ValueError("TWITTERXAPI_TOKEN not found in environment variables")
12
13# Use the token
14result = check_dm_status(["elonmusk"], API_TOKEN).env ファイルを作成:
# TwitterXApi Configuration
TWITTERXAPI_TOKEN=your_actual_token_here.gitignore に追加:
.env
*.env
.env.local
.env.productionAPI トークンの取得
Answer: API トークンの取得は本ガイドの TwexAPI エンドポイントを Bearer で呼び出して実装します。バッチ/ページングで約 14 Credits/回・20+ QPS です。
TwitterXApi を使うには Bearer token が必要です:
- TwitterXApi.com で登録
- ダッシュボードへ移動
- 新しい API キーを作成
- Bearer token をコピー
- 環境変数に安全に保存
実践的な実装例
Answer: 実践的な実装例は本ガイドの TwexAPI エンドポイントを Bearer で呼び出して実装します。バッチ/ページングで約 14 Credits/回・20+ QPS です。
本番投入可能な完全な実装例:
1"""
2TwitterXApi DM Status Checker
3A production-ready implementation with all best practices
4"""
5
6import os
7import requests
8import time
9import logging
10from datetime import datetime, timedelta
11from typing import List, Dict, Optional
12from dataclasses import dataclass
13from dotenv import load_dotenv
14
15# Configure logging
16logging.basicConfig(
17 level=logging.INFO,
18 format='%(asctime)s - %(levelname)s - %(message)s'
19)
20logger = logging.getLogger(__name__)
21
22@dataclass
23class DMStatusResult:
24 """Data class for DM status results"""
25 username: str
26 dm_status: bool
27 checked_at: datetime
28 cached: bool = False
29
30class DMStatusChecker:
31 """
32 Production-ready DM status checker with caching,
33 error handling, and retry logic
34 """
35
36 def __init__(
37 self,
38 api_token: Optional[str] = None,
39 cache_duration_hours: int = 24,
40 max_retries: int = 3
41 ):
42 """
43 Initialize DM status checker
44
45 Args:
46 api_token: TwitterXApi bearer token (or from env)
47 cache_duration_hours: Cache validity duration
48 max_retries: Maximum retry attempts
49 """
50 # Load API token
51 if api_token is None:
52 load_dotenv()
53 api_token = os.getenv('TWITTERXAPI_TOKEN')
54
55 if not api_token:
56 raise ValueError("API token required")
57
58 self.api_token = api_token
59 self.url = "https://api.twexapi.io/dm/status"
60 self.cache = {}
61 self.cache_duration = timedelta(hours=cache_duration_hours)
62 self.max_retries = max_retries
63
64 logger.info("DMStatusChecker initialized")
65
66 def check(
67 self,
68 usernames: List[str],
69 use_cache: bool = True
70 ) -> List[DMStatusResult]:
71 """
72 Check DM status for users
73
74 Args:
75 usernames: List of Twitter usernames
76 use_cache: Whether to use cached results
77
78 Returns:
79 List of DMStatusResult objects
80 """
81 if not usernames:
82 logger.warning("Empty username list provided")
83 return []
84
85 logger.info(f"Checking DM status for {len(usernames)} users")
86
87 # Separate cached and uncached users
88 results = []
89 uncached = []
90
91 if use_cache:
92 for username in usernames:
93 cached_result = self._get_from_cache(username)
94 if cached_result:
95 results.append(cached_result)
96 else:
97 uncached.append(username)
98
99 if results:
100 logger.info(f"Cache hits: {len(results)}/{len(usernames)}")
101 else:
102 uncached = usernames
103
104 # Fetch uncached users from API
105 if uncached:
106 api_results = self._fetch_from_api(uncached)
107 if api_results:
108 results.extend(api_results)
109
110 logger.info(f"Total results: {len(results)}")
111 return results
112
113 def _get_from_cache(self, username: str) -> Optional[DMStatusResult]:
114 """Get result from cache if valid"""
115 if username in self.cache:
116 result, timestamp = self.cache[username]
117 if datetime.now() - timestamp < self.cache_duration:
118 return DMStatusResult(
119 username=username,
120 dm_status=result,
121 checked_at=timestamp,
122 cached=True
123 )
124 return None
125
126 def _set_cache(self, username: str, dm_status: bool):
127 """Store result in cache"""
128 self.cache[username] = (dm_status, datetime.now())
129
130 def _fetch_from_api(
131 self,
132 usernames: List[str]
133 ) -> List[DMStatusResult]:
134 """Fetch DM status from API with retry logic"""
135 headers = {
136 "Authorization": f"Bearer {self.api_token}",
137 "Content-Type": "application/json"
138 }
139
140 for attempt in range(self.max_retries):
141 try:
142 logger.info(f"API request attempt {attempt + 1}/{self.max_retries}")
143
144 response = requests.post(
145 self.url,
146 json=usernames,
147 headers=headers,
148 timeout=15
149 )
150
151 if response.status_code == 200:
152 data = response.json()
153 results = []
154
155 for user_data in data.get('data', []):
156 username = user_data['username']
157 dm_status = user_data['dm_status']
158
159 # Cache the result
160 self._set_cache(username, dm_status)
161
162 # Create result object
163 results.append(DMStatusResult(
164 username=username,
165 dm_status=dm_status,
166 checked_at=datetime.now(),
167 cached=False
168 ))
169
170 logger.info(f"Successfully fetched {len(results)} results")
171 return results
172
173 elif response.status_code == 429:
174 wait_time = 60 * (attempt + 1)
175 logger.warning(f"Rate limit hit, waiting {wait_time}s")
176 time.sleep(wait_time)
177
178 elif response.status_code in [401, 422]:
179 logger.error(f"API error {response.status_code}: {response.text}")
180 return []
181
182 else:
183 logger.warning(f"HTTP {response.status_code}: {response.text}")
184
185 except requests.exceptions.RequestException as e:
186 logger.error(f"Request failed: {e}")
187
188 # Exponential backoff
189 if attempt < self.max_retries - 1:
190 wait = 2 ** attempt
191 logger.info(f"Waiting {wait}s before retry")
192 time.sleep(wait)
193
194 logger.error("All retry attempts failed")
195 return []
196
197 def get_cache_stats(self) -> Dict:
198 """Get cache statistics"""
199 now = datetime.now()
200 valid = sum(
201 1 for _, timestamp in self.cache.values()
202 if now - timestamp < self.cache_duration
203 )
204
205 return {
206 'total_entries': len(self.cache),
207 'valid_entries': valid,
208 'expired_entries': len(self.cache) - valid
209 }
210
211 def clear_cache(self):
212 """Clear the cache"""
213 self.cache.clear()
214 logger.info("Cache cleared")
215
216# Example usage
217def main():
218 """Example usage of DMStatusChecker"""
219
220 # Initialize checker
221 checker = DMStatusChecker(
222 cache_duration_hours=24,
223 max_retries=3
224 )
225
226 # Test users
227 usernames = [
228 "elonmusk", "billgates", "twitter",
229 "nasa", "openai", "github"
230 ]
231
232 # Check DM status
233 results = checker.check(usernames, use_cache=True)
234
235 # Display results
236 print("\\n" + "="*60)
237 print("DM STATUS REPORT".center(60))
238 print("="*60 + "\\n")
239
240 open_count = 0
241 closed_count = 0
242
243 for result in results:
244 status_emoji = "✅" if result.dm_status else "❌"
245 status_text = "Open" if result.dm_status else "Closed"
246 cache_indicator = "📦" if result.cached else "🌐"
247
248 print(f"{status_emoji} @{result.username:15} DMs: {status_text:8} {cache_indicator}")
249
250 if result.dm_status:
251 open_count += 1
252 else:
253 closed_count += 1
254
255 # Summary
256 print("\\n" + "-"*60)
257 print(f"Total Users: {len(results)}")
258 print(f"DMs Open: {open_count} ({open_count/len(results)*100:.1f}%)")
259 print(f"DMs Closed: {closed_count} ({closed_count/len(results)*100:.1f}%)")
260 print("-"*60)
261
262 # Cache stats
263 cache_stats = checker.get_cache_stats()
264 print(f"\\nCache: {cache_stats['valid_entries']} valid entries")
265 print("="*60)
266
267if __name__ == "__main__":
268 main()まとめ
TwitterXApi の DM Status エンドポイントは、Twitter 自動化、SNS 管理、カスタマーエンゲージメントアプリを構築する人に不可欠です。メッセージ送信前に DM ステータスを確認すれば:
- ✅ 効率向上:失敗する送信試行を回避
- 💰 コスト削減:不要な API 呼び出しを最小化
- 🎯 ターゲティング改善:正確な連絡先情報
- 🚀 自動化強化:より賢いボットロジック
- 📊 分析向上:DM 可用性の追跡
要点
- バッチ処理:常に複数ユーザーを一度に確認
- キャッシュ:API 呼び出し削減のためキャッシュ実装
- エラーハンドリング:リトライと適切なエラー処理
- セキュリティ:API トークンは環境変数に保存
- 監視:DM ステータス変化を時系列で追跡
- 統合:CRM とマーケティングツールと連携
はじめに
- TwitterXApi.com に登録
- ダッシュボードで Bearer token を取得
- 必要パッケージ(
requests、python-dotenv)をインストール - 基本チェック機能を実装
- キャッシュとエラーハンドリングを追加
- 本番利用へスケール
次のステップ
- 他の TwitterXApi エンドポイントで包括的自動化
- 統合 SNS 管理ダッシュボードの構築
- リアルタイム監視システムの実装
- インテリジェントチャットボットと CS ツールの作成
詳細は次をご覧ください:
TwitterXApi の DM ステータス確認で、よりスマートな Twitter 自動化を今日から始めましょう!
本ガイドは 2025 年 1 月時点の TwitterXApi ドキュメントに基づきます。最新情報は常に公式ドキュメントを参照してください。