Перейти к содержанию

Usage

С этим модулем вы можете создавать юзерботов или простых ботов с легкостью. Для создания простого хендлера вам нужно меньше 6 строк кода.

В этом примере вы можете видеть бота-группу с фильтром для события message_new с текстом hello и хендлером, который переделывает возвращаемую строку в ответ для API.

import logging
import asyncio

from vkwave.client import AIOHTTPClient
from vkwave.api import BotSyncSingleToken, Token, API
from vkwave.bots import (
    TokenStorage,
    Dispatcher,
    BotLongpollExtension,
    DefaultRouter,
    GroupId,
    EventTypeFilter,
)
from vkwave.types.bot_events import BotEventType
from vkwave.longpoll import BotLongpollData, BotLongpoll


logging.basicConfig(level=logging.DEBUG)
bot_token = Token("123")
gid = 456


async def main():
    client = AIOHTTPClient()
    token = BotSyncSingleToken(bot_token)
    api_session = API(token, client)
    api = api_session.get_context()
    lp_data = BotLongpollData(gid)
    longpoll = BotLongpoll(api, lp_data)
    token_storage = TokenStorage[GroupId]()
    dp = Dispatcher(api_session, token_storage)
    lp_extension = BotLongpollExtension(dp, longpoll)
    router = DefaultRouter()

    simple_handler = (
        router.registrar.new()
        .with_filters(
            lambda event: event.object.object.message.text == "hello",
            EventTypeFilter(BotEventType.MESSAGE_NEW),
        )
        .handle(lambda event: f"hello, {event.object.object.message.from_id}")
        .ready()
    )
    #  >> hello
    #  >> hello, 578716413

    router.registrar.register(simple_handler)

    dp.add_router(router)
    await dp.cache_potential_tokens()
    await lp_extension.start()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever()

Другой путь создания хендлера.

...

class TextFilter(BaseFilter):
    async def check(self, event: BotEvent) -> FilterResult:
        return FilterResult(event.object.object.message.text == "hello")


class MyCallback(BaseCallback):
    def __init__(self, func: Callable[[BaseEvent], Awaitable[Any]]):
        self.func = func

    async def execute(self, event: BaseEvent) -> Any:
        # do smth
        await asyncio.sleep(5)
        return await self.func(event)


async def answer(event: BotEvent):
    event: BotEvent
    return f"hello, {event.object.object.message.from_id}"


def get_handler(router):
    event_type_filter = EventTypeFilter(BotEventType.MESSAGE_NEW)
    text_filter = TextFilter()
    result_callback = MyCallback(answer)

    simple_handler = router.registrar.new()
    simple_handler.filters = [event_type_filter, text_filter]
    simple_handler.callback = result_callback
    return simple_handler


async def main():
    ...

    simple_handler = get_handler(router)
    router.registrar.register(simple_handler.ready())

    dp.add_router(router)
    await dp.cache_potential_tokens()
    await lp_extension.start()

И еще один.

router = DefaultRouter()


@router.registrar.with_decorator(
    lambda event: event.object.object.message.text == "hello",
    EventTypeFilter(BotEventType.MESSAGE_NEW),
)
def simple_handler(event: BotEvent):
    return f"hello, {event.object.object.message.from_id}"


@router.registrar.with_decorator(
    lambda event: event.object.object.message.text == "bye",
    EventTypeFilter(BotEventType.MESSAGE_NEW),
)
async def another_simple_handler(event: BotEvent):
    return await event.api_ctx.messages.send(
        message=f"bye, {event.object.object.message.from_id}",
        random_id=0,
        user_id=event.object.object.message.from_id,
    )