linkchanbot (22036B)
1 #!/usr/bin/python 2 """ 3 This is a Telegram Bot which sanitises and substitutes share links 4 for lightweight, privacy respecting proxy alternatives. 5 """ 6 7 from telegram import ( 8 MessageEntity, ParseMode, 9 InlineQueryResultArticle, InputTextMessageContent, 10 InlineKeyboardMarkup, InlineKeyboardButton, 11 ) 12 from telegram.ext import ( 13 Updater, Filters, 14 MessageHandler, CommandHandler, 15 InlineQueryHandler, ChosenInlineResultHandler, 16 ) 17 from telegram.constants import MAX_INLINE_QUERY_RESULTS as MAX_RESULTS 18 from telegram import error 19 20 from urllib.parse import urlparse, urlencode, parse_qs 21 import logging.handlers 22 import argparse 23 import configparser 24 import functools 25 import json 26 import os 27 import pathlib 28 import random 29 import shutil 30 import signal 31 import sys 32 import threading 33 34 35 # Constants 36 37 VERSION = "1.0.0" 38 39 TEMPLATE = """ 40 {new} 41 <a href="{old}">source</a> 42 """ 43 44 EXAMPLES = { 45 "Twitter": [ 46 "https://twitter.com/BeautyOfSpace/status/332490279376519169", 47 "https://twitter.com/anvxmes/status/1375175567587356673", 48 "https://twitter.com/Chrisvb700/status/1373169970117496833", 49 ], 50 "YouTube": [ 51 "https://www.youtube.com/watch?v=J---aiyznGQ#", 52 "https://www.youtube.com/watch?v=KmtzQCSh6xk", 53 "https://www.youtube.com/watch?v=9Gj47G2e1Jc", 54 ], 55 "Instagram": [ 56 "https://www.instagram.com/p/B-b-POVFb1r/", 57 "https://www.instagram.com/p/CMW0Fx6lum6/", 58 "https://www.instagram.com/p/CL_vMidl_W2/", 59 ], 60 "Reddit": [ 61 "https://www.reddit.com/r/wallpaper/comments/mctm44/dope19201080/", 62 "https://www.reddit.com/r/wallpaper/comments/m98fnz/great_art_by_mike_fazbear_3840x2160/", 63 "https://www.reddit.com/r/reddit.com/comments/17913/reddit_now_supports_comments/c51/", 64 ], 65 "Medium": [ 66 "https://medium.com/@nikitonsky/medium-is-a-poor-choice-for-blogging-bb0048d19133", 67 "https://medium.com/@ftrain/big-data-small-effort-b62607a43a8c", 68 "https://medium.com/@swalahamani/the-art-of-computer-programming-9fbd8fd56265", 69 ], 70 } 71 72 73 # Initialisation 74 75 def args(): 76 """ 77 Parse command-line arguments. Provide basic help interface. 78 """ 79 parser = argparse.ArgumentParser( 80 prog = "linkchanbot", 81 formatter_class = argparse.RawDescriptionHelpFormatter, 82 description = \ 83 "A Telegram bot that substitutes common share link with\n" 84 "lightweight, privacy respecting proxy alternatives.", 85 epilog = f"linkchanbot {VERSION}" 86 ) 87 parser.add_argument('-v', '--version', help='print version and exit', action='store_true') 88 parser.add_argument('-l', '--logfile', help='specify the log file') 89 90 args = parser.parse_args() 91 92 if args.version: 93 stderr(f"linkchanbot {VERSION}") 94 exit(0) 95 96 return args 97 98 99 def init(args): 100 """ 101 Loads configuration from config files and environment variables. 102 To be called before main logic. 103 104 Has side effects. See globals below. 105 """ 106 # Filesystem 107 cache_home = pathlib.Path(os.getenv('XDG_CACHE_HOME', os.getenv('HOME') + '/.cache')) 108 cache_dir = cache_home/'linkchan' 109 cache_dir.mkdir(parents=True, exist_ok=True) # EFFECT 110 111 config_home = pathlib.Path(os.getenv('XDG_CONFIG_HOME', os.getenv('HOME') + '/.config')) 112 config_dir = config_home/'linkchan' 113 config_dir.mkdir(parents=True, exist_ok=True) # EFFECT 114 115 sys_share_dir = pathlib.Path('/usr/local/share/linkchan') 116 117 config_files = ('bot.cfg', 'alts.json', 'services.json') 118 119 # Copy system global config files to local XDG config dir. 120 # Fail if files not found. 121 for file in config_files: 122 locfile = config_dir/file 123 sysfile = sys_share_dir/file 124 125 if locfile.is_file(): 126 continue 127 else: 128 if sysfile.is_file(): 129 shutil.copy(sysfile, locfile) 130 else: 131 stderr( 132 f"Error: config file '{file}' not found in" 133 f"'{config_dir}' or '{sys_share_dir}'", 134 ) 135 exit(1) 136 137 # Logging 138 LOGFILE = args.logfile or os.getenv('LINKCHAN_LOGFILE') or cache_dir/'log' 139 140 try: 141 handler = logging.handlers.WatchedFileHandler(LOGFILE) 142 except FileNotFoundError as e: 143 stderr("Error: logfile:", e) 144 exit(1) 145 146 logging.basicConfig( 147 format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s', 148 level = logging.INFO, 149 handlers = (handler,) 150 ) 151 152 # Config 153 TOKEN = os.getenv('LINKCHAN_TOKEN') 154 ADMIN = os.getenv('LINKCHAN_ADMIN') 155 156 BOT_CFG = configparser.ConfigParser() 157 BOT_CFG.read(config_dir/'bot.cfg') 158 159 TOKEN = TOKEN or BOT_CFG.get("auth", "token", fallback=False) 160 ADMIN = ADMIN or BOT_CFG.get("auth", "admin", fallback=False) 161 162 if not TOKEN: 163 stderr("Error: No bot token provided") 164 exit(1) 165 166 global ALTS 167 global SERVICES 168 169 try: 170 with open(config_dir/'alts.json', 'r') as file: 171 ALTS = json.load(file) 172 with open(config_dir/'services.json', 'r') as file: 173 SERVICES = json.load(file) 174 except FileNotFoundError as e: 175 stderr("Error: Missing config file:", e) 176 exit(1) 177 except json.decoder.JSONDecodeError as e: 178 stderr( 179 f"Error: JSON syntax error in '{file.name}':", e, 180 ) 181 exit(1) 182 183 # Validate ALTS 184 for altsite, alt in ALTS.items(): 185 if 'service' not in alt: 186 logging.warn( 187 f"alts.json: '{altsite}' has no 'service' value, ignored" 188 ) 189 190 return TOKEN, ADMIN, LOGFILE 191 192 193 194 # Util 195 196 def stderr(*args, **kwargs): 197 """ 198 Prints to stderr. 199 """ 200 print(*args, **kwargs, file=sys.stderr) 201 202 def logged(old_cb_func): 203 """ 204 Wraps callback functions, logs incomming telegram updates. 205 """ 206 @functools.wraps(old_cb_func) 207 def new_cb_func(upd, ctx, **kwargs): 208 if upd.message and upd.message.text: 209 status = mk_status(upd, 'msg', '<:', oneline(upd.message.text)) 210 elif upd.message and upd.message.caption: 211 status = mk_status(upd, 'cap', '<:', oneline(upd.message.caption)) 212 elif upd.message: 213 status = mk_status(upd, 'msg', '#:', upd.effective_message) 214 elif upd.chosen_inline_result: 215 status = mk_status(upd, 'cir', '::', oneline(upd.chosen_inline_result.result_id)) 216 elif upd.inline_query: 217 status = mk_status(upd, 'ilq', '?:', oneline(upd.inline_query.query)) 218 elif upd.effective_message: 219 status = mk_status(upd, 'ukn', '#:', upd.effective_message) 220 else: 221 status = mk_status(upd, 'ukn', '#:') 222 223 logging.info(status) 224 print(status) 225 226 return old_cb_func(upd, ctx, **kwargs) 227 228 return new_cb_func 229 230 231 def mk_status(upd, utype, dl='::', text=None): 232 """ 233 Prepares a standardised string for logging. 234 Called by wrapped callbacks (see @logged) 235 or by callbacks for terminal output. 236 """ 237 uid = upd.update_id 238 if upd.effective_user: 239 user_id = upd.effective_user.id 240 user_name = upd.effective_user.name 241 elif upd.channel_post and upd.channel_post.from_user: 242 user_id = upd.channel_post.from_user.id 243 user_name = upd.channel_post.from_user.name 244 elif upd.poll: 245 user_id = '<poll>' 246 user_name = '<poll>' 247 else: 248 user_id = '<unknown>' 249 user_name = '<unknown>' 250 251 chat = upd.effective_chat 252 if chat: 253 chat_id = chat.id 254 try: 255 chat_name = chat.link or chat.title or chat.full_name 256 except AttributeError: 257 chat_name = None 258 chat_name = chat_name.replace('https://t.me/', '@') 259 else: 260 chat_id = '#' 261 chat_name = '#' 262 263 if not text: 264 if upd.effective_message: 265 text = upd.effective_message 266 else: 267 text = "?" 268 269 status = f"{uid} [{utype}] - {user_id} <{user_name}> - {chat_id} ({chat_name}) - {dl} {text}" 270 return status 271 272 273 @functools.cache 274 def mk_newlinks(link): 275 """ 276 The core logic of link substitution. 277 Given a link, returns either: 278 [str...] A list of new links. 279 [False] A list with a single False element. 280 """ 281 # Prepare and parse link string 282 if not link.startswith('https://') and not link.startswith('http://'): 283 link = 'https://' + link 284 285 url = urlparse(link) 286 287 # Enforce HTTPS 288 url = url._replace(scheme='https') 289 290 # Recognise service 291 if url.netloc in SERVICES.keys(): 292 service = url.netloc 293 else: 294 for main_domain, service_data in SERVICES.items(): 295 if url.netloc in service_data['alt_domains']: 296 service = main_domain 297 break 298 else: 299 # Fail if service is unrecognised 300 return [False] 301 302 # Keep only allowed URL queries 303 allowed_queries = SERVICES[service].get('query_whitelist') or [] 304 old_queries = parse_qs(url.query, keep_blank_values=True) 305 new_queries = { 306 query:v for (query,v) in old_queries.items() 307 if query in allowed_queries 308 } 309 url = url._replace( 310 query = urlencode(new_queries, doseq=True) 311 ) 312 313 # Find alts for replacing `service` 314 applicable_alts = { 315 altsite: alt for (altsite, alt) in ALTS.items() 316 if alt.get('service') == service 317 } 318 319 # Make new substitutes 320 newlinks = list(map( 321 lambda newdomain: url._replace(netloc=newdomain).geturl(), 322 applicable_alts.keys() 323 )) 324 325 return newlinks 326 327 328 @functools.cache 329 def oneline(s: str) -> str: 330 """ 331 Converts newlines and tabs to ASCII representations. 332 """ 333 s = s.replace('\\', '\\\\') 334 return s.replace('\n', '\\n').replace('\t', '\\t') 335 336 337 # Callback Handlers 338 339 @logged 340 def cb_start(upd, ctx): 341 """ 342 /start callback 343 """ 344 345 # If user pressed "See examples", they were sent to bot PMs 346 # to /start with the payload "examples". 347 if ctx.args and ctx.args[0] == 'examples': 348 examples(upd, ctx) 349 return 350 351 bot_username = ctx.bot.get_me().username.replace('_', '\\_') 352 353 # outgoing text 354 msg = f""" 355 @{bot_username} cleans & proxies your share links. 356 I support _Twitter_, _YouTube_, _Instagram_, _Reddit_, and _Medium_. 357 358 *Try inline* 359 Type: `@{bot_username} <link>` 360 [See examples](t.me/{bot_username}?start=examples) 361 362 *Try bot PMs* 363 [Send me](t.me/{bot_username}) a link 364 365 *Try group chats* 366 [Add me](t.me/{bot_username}?startgroup=1) and share links 367 368 See /help, /about or @linkchan\_updates 369 """ 370 371 # Inline keyboard with "Try inline" button. 372 # See: https://core.telegram.org/bots/api#inlinekeyboardbutton 373 reply_markup = InlineKeyboardMarkup([ 374 [ 375 InlineKeyboardButton( 376 'Try inline', 377 # Launches inline mode on button press with no query 378 switch_inline_query_current_chat = '', 379 ), 380 ], 381 ]) 382 383 # Send message 384 upd.message.reply_text( 385 msg, 386 disable_web_page_preview = True, 387 parse_mode = ParseMode.MARKDOWN, 388 reply_markup = reply_markup, 389 ) 390 391 392 def cb_help(upd, ctx): 393 """ 394 /help callback 395 """ 396 bot_username = ctx.bot.get_me().username.replace('_', '\\_') 397 398 # Outgoing text 399 msg = f""" 400 *DESCRIPTION* 401 @{bot_username} substitutes the share links of popular services for lightweight and privacy respecting alternatives, and sanitises unnecesary queries and trackers. 402 403 *USAGE* 404 See /start 405 406 *SUPPORTED SERVICES* 407 - twitter.com => Nitter 408 - youtube.com => Inividious, CloudTube 409 - instagram.com => Bibliogram 410 - reddit.com => Teddit, Libreddit, Old Reddit 411 - medium.com => Scribe 412 413 *NOTES* 414 For in-chat replies, default proxies are used. For inline queries, a menu of proxies are available. To cycle through proxy menus, append '#' to your link. 415 416 All URL query parameters for all domains are removed, except for whitelisted queries per service. 417 418 *PRIVACY* 419 This bot only logs messages with links and inline queries. 420 421 This bot receives no chat data when used in inline mode, only the user data of the user who is using inline mode. Use inline mode for maximum privacy. 422 423 *CONTRIBUTING* 424 Have a suggestion or bug report? 425 See /about or @linkchan\_updates. 426 """ 427 428 # Send message 429 upd.message.reply_text( 430 msg, 431 disable_web_page_preview = True, 432 parse_mode = ParseMode.MARKDOWN 433 ) 434 435 436 @logged 437 def cb_about(upd, ctx): 438 """ 439 /about callback 440 """ 441 bot_username = ctx.bot.get_me().username.replace('_', '\\_') 442 443 # Outgoing text 444 msg = f""" 445 @{bot_username} (@linkchan\_updates) 446 447 Version 448 {VERSION} 449 Source code 450 https://sr.ht/~torresjrjr/linkchanbot 451 Maintainer 452 @torresjrjr <b@torresjrjr.com> 453 License 454 GNU Affero General Public License 455 """ 456 457 # Send message 458 upd.message.reply_text( 459 msg, 460 parse_mode = ParseMode.MARKDOWN 461 ) 462 463 464 def examples(upd, ctx): 465 """ 466 Returns an inline keyboard of examples of inline queries. 467 Called when user sends /start with payload "examples". 468 See cb_start(). 469 """ 470 471 # Inline keyboard with a button for each example in `EXAMPLES`. 472 # See: https://core.telegram.org/bots/api#inlinekeyboardbutton 473 reply_markup = InlineKeyboardMarkup([ 474 [ 475 InlineKeyboardButton( 476 service, 477 # Lauches inline mode on button press 478 # with example as the query. 479 switch_inline_query_current_chat = links[0], 480 ) 481 ] \ 482 for service, links in EXAMPLES.items() 483 ]) 484 485 # Send message 486 upd.message.reply_text( 487 "Try inline query examples", 488 parse_mode = ParseMode.MARKDOWN, 489 reply_markup = reply_markup, 490 ) 491 492 493 494 @logged 495 def cb_link_handler(upd, ctx): 496 """ 497 Handles messages with links (see main > MessageHandler). 498 Replies with `TEMPLATE` with new links. 499 """ 500 if not upd.message: 501 # Will have been logged as [ukn] in @logged 502 return 503 504 links = [] 505 506 # Telegram returns message metadata called 'entities' 507 # (commands, hashtags, mentions, formatted text, links, etc.). 508 # We extract the link entities. 509 entities = {} 510 entities.update(upd.message.parse_entities()) 511 entities.update(upd.message.parse_caption_entities()) 512 513 for ent, link in entities.items(): 514 link = oneline(link) 515 if ent['type'] == 'url': 516 links += [ link ] 517 if ent['type'] == 'text_link': 518 links += [ ent['url'] ] 519 520 # Filter for links which have substitutes. 521 # mk_newlinks() returns either [str...] or [False] 522 oldlinks = list(filter( 523 lambda old: mk_newlinks(old)[0], 524 links 525 )) 526 527 # Generate corresponding newlinks, 528 # by picking the first suggestion from mk_newlinks() 529 newlinks = list(map( 530 lambda old: mk_newlinks(old)[0], 531 oldlinks 532 )) 533 534 # Send substitutes as separate messages 535 for oldlink, newlink in zip(oldlinks, newlinks): 536 logging.info(mk_status(upd, 'out', '::', f"{newlink}")) 537 538 msg = TEMPLATE.format(new=newlink, old=oldlink) 539 upd.message.reply_text(msg, parse_mode=ParseMode.HTML) 540 541 542 @logged 543 def cb_inline_query(upd, ctx): 544 """ 545 Handles inline queries. Sends back prompt menu of new links. 546 """ 547 query = upd.inline_query.query 548 newlinks = mk_newlinks(query) 549 550 # If the query string is not a URL, 551 # return a menu of a random sample of alts. 552 if query == '' or not newlinks[0]: 553 nr_results = len(ALTS) if len(ALTS) <= MAX_RESULTS else MAX_RESULTS 554 555 results = [ 556 InlineQueryResultArticle( 557 id = altsite, 558 title = altsite, 559 url = altsite, 560 description = alt.get('description', 'Alt service'), 561 thumb_url = alt.get('thumb_url'), 562 input_message_content = InputTextMessageContent(altsite) 563 ) \ 564 for altsite, alt in random.sample( 565 sorted(ALTS.items()), nr_results 566 ) 567 ] 568 # Otherwise, return a menu of a random sample of newlinks 569 # and their alt metadata to populate the inline results menu. 570 else: 571 alts = { 572 newlink: ALTS[urlparse(newlink).netloc] 573 for newlink in newlinks 574 } 575 576 nr_results = len(alts) if len(alts) <= MAX_RESULTS else MAX_RESULTS 577 578 results = [ 579 InlineQueryResultArticle( 580 id = f"{upd.update_id}+{urlparse(newlink).netloc}", 581 title = urlparse(newlink).netloc, 582 url = newlink, 583 description = alt.get('description', 'Alt service'), 584 thumb_url = alt.get('thumb_url'), 585 input_message_content = InputTextMessageContent( 586 TEMPLATE.format(new=newlink, old=query), 587 parse_mode=ParseMode.HTML, 588 ) 589 ) \ 590 for newlink, alt in random.sample( 591 sorted(alts.items()), nr_results 592 ) 593 ] 594 595 bot_username = ctx.bot.get_me().username 596 597 # Answer inline query 598 upd.inline_query.answer( 599 results, 600 # switch_pm_* adds a button the the inline results menu 601 # to open the bot chat. 602 # See: https://core.telegram.org/bots/api#answerinlinequery 603 switch_pm_text=f"Open @{bot_username}", 604 switch_pm_parameter='inline', 605 ) 606 607 608 @logged 609 def cb_chosen_inline_result(upd, ctx): 610 """ 611 Callback for chosen inline query results. For logging only. 612 See @logged 613 """ 614 pass 615 616 617 def cb_error(update, context): 618 try: 619 raise context.error 620 except error.TelegramError as e: 621 print("Error: TelegramError:", e, update) 622 except error.ChatMigrated as e: 623 print("Error: ChatMigrated:", e, update) 624 except error.Conflict as e: 625 print("Error: Confict:", e, update) 626 except error.InvalidToken as e: 627 print("Error: InvalidToken:", e, update) 628 except error.RetryAfter as e: 629 print("Error: RetryAfter:", e, update) 630 except error.Unauthorized as e: 631 print("Error: Unauthorized:", e, update) 632 633 except error.NetworkError as e: 634 print("Error: NetworkError:", e, update) 635 except error.BadRequest as e: 636 print("Error: BadRequest:", e, update) 637 except error.TimedOut as e: 638 print("Error: TimedOut:", e, update) 639 640 641 642 # Main 643 644 def main(): 645 TOKEN, ADMIN, LOGFILE = init(args()) 646 647 # Init bot 648 try: 649 updater = Updater(TOKEN, use_context=True) 650 except error.InvalidToken as e: 651 stderr(f"Error: Invalid token '{TOKEN}'") 652 exit(1) 653 654 # Test token 655 try: 656 bot_user = updater.bot.get_me() 657 except Unauthorized as e: 658 stderr("Error: Faulty token:", e) 659 exit(1) 660 661 BOT_USERNAME = bot_user.username 662 BOT_ID = bot_user.id 663 664 665 dp = updater.dispatcher 666 dp.add_error_handler(cb_error) 667 668 dp.add_handler(CommandHandler('start', cb_start)) 669 dp.add_handler(CommandHandler('help', cb_help)) 670 dp.add_handler(CommandHandler('about', cb_about)) 671 672 dp.add_handler(InlineQueryHandler(cb_inline_query)) 673 dp.add_handler(ChosenInlineResultHandler(cb_chosen_inline_result)) 674 675 dp.add_handler(MessageHandler( 676 ( 677 # non-edited messages 678 Filters.update.message | Filters.update.channel_post 679 ) & ( 680 # messages with links 681 Filters.entity(MessageEntity.URL) | 682 Filters.entity(MessageEntity.TEXT_LINK) | 683 Filters.caption_entity(MessageEntity.URL) | 684 Filters.caption_entity(MessageEntity.TEXT_LINK) 685 ) & ( 686 # not messages created via this bot (inline queries, etc.) 687 ~Filters.via_bot(username=BOT_USERNAME) 688 ) & ~( 689 # not messages from bot forwarded to group chats 690 # (forwarded to bot's private chat is OK) 691 Filters.forwarded_from(username=BOT_USERNAME) 692 & ~Filters.chat_type.private 693 ), 694 cb_link_handler 695 )) 696 697 if ADMIN: 698 # Admin callbacks 699 # See: https://github.com/python-telegram-bot/python-telegram-bot/wiki/Code-snippets/1c6ab0d3324a83de2a0a41910491211be2ffb46b#simple-way-of-restarting-the-bot 700 def stop_and_restart(): 701 """ 702 Gracefully stop the updater 703 and replace the current process with a new one. 704 Called by cb_restart(). 705 """ 706 # `updater` in scope of function definition 707 updater.stop() 708 return os.execl(sys.executable, sys.executable, *sys.argv) 709 710 @logged 711 def cb_restart(upd, ctx): 712 """ 713 /restart callback. Restarts the bot. 714 See handler for authorisation. 715 """ 716 status = mk_status(upd, 'cmd', '::', "Authorised - restarting bot...") 717 logging.info(status) 718 print(status) 719 upd.message.reply_text(status) 720 721 return threading.Thread(target=stop_and_restart).start() 722 723 @logged 724 def cb_shutdown(upd, ctx): 725 """ 726 /shutdown callback. Shuts down the bot. 727 See handler for authorisation. 728 """ 729 status = mk_status(upd, 'cmd', '::', "Authorised - shutdown SIGINT") 730 logging.info(status) 731 print(status) 732 upd.message.reply_text(status) 733 734 os.kill(os.getpid(), signal.SIGINT) 735 736 # Admin handlers 737 dp.add_handler(CommandHandler( 738 'restart', cb_restart, 739 filters=Filters.user(username=ADMIN) 740 )) 741 dp.add_handler(CommandHandler( 742 'shutdown', cb_shutdown, 743 filters=Filters.user(username=ADMIN) 744 )) 745 746 # Start serving 747 stderr(f"linkchanbot {VERSION}") 748 stderr(f"logfile: {LOGFILE}") 749 stderr(f"bot: {BOT_ID} <@{BOT_USERNAME}>") 750 stderr("Bot serving...") 751 752 updater.start_polling() 753 updater.idle() 754 755 stderr("Bot stopped.") 756 return 757 758 759 if __name__=='__main__': 760 main()