import os
from literalai import LiteralClient
client = LiteralClient(api_key=os.getenv("LITERAL_API_KEY"))
@client.step(type="tool")
def my_tool():
# Implement your custom logic here
return "Success"
# You can change the step type
@client.step(type="run")
def my_run():
# Implement your custom logic here
my_tool()
return "Success"
with client.thread() as thread:
client.message(content="Hello World", type="assistant_message", name="My Assistant")
my_run()
# Network requests by the SDK are performed asynchronously.
# Invoke flush_and_stop() to guarantee the completion of all requests prior to the process termination.
# WARNING: If you run a continuous server, you should not use this method.
client.flush_and_stop()