Matrix Python Robot

看了一眼 Matrix 的 bot sdk。随手写的机器人。实现了 python 登录 matrix 网络功能。还增加了一个往 obsidian 保存的功能。

requirements.txt

1
nio==3.4.2

如果不好使 安装

1
2
pip install matrix-nio pip install "matrix-nio[e2e]"
pip install requests

bot.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from importlib import util
import asyncio
from nio import AsyncClient, SyncResponse, RoomMessageText

async_client = AsyncClient("https://matrix.org", "mybotaccount")

response_string = "!bot"

import json
import requests

def send_ob(str):
    '''python send to obsidian'''
    req = requests.post(
        url="https://xxxxxxxx.cn-beijing.fcapp.run/ob/today",
        headers={
            "Content-Type": "application/json",
            "Token": "xxxxxxxxxxxxxx",
        },
        data=json.dumps({"content": str }),
    )
    return req.text

async def main():
    response = await async_client.login("xxxxxxxxxx")
    print(response)
    while True:
        sync_response = await async_client.sync(30000)
        print(sync_response)  # note that this could be LARGE!
        # do some reading from sync_response
        joins = sync_response.rooms.join
        for room_id in joins:
            for event in joins[room_id].timeline.events:
                if hasattr(event, "body") and event.body.startswith(response_string):
                    response_body = event.body.replace(response_string, "").strip()
                    content = {"body": send_ob(response_body), "msgtype": "m.text"}
                    # content = {"body": response_body, "msgtype": "m.text"}
                    await async_client.room_send(room_id, "m.room.message", content)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())