|
from dataclasses import dataclass |
|
from typing import Any, Iterable, List, Optional, Type |
|
|
|
from temporalio import activity |
|
from temporalio.api.common.v1 import Payload |
|
from temporalio.converter import (CompositePayloadConverter, |
|
DefaultPayloadConverter, |
|
EncodingPayloadConverter, PayloadCodec) |
|
|
|
|
|
@dataclass |
|
class MySpecialObject: |
|
foo: str |
|
|
|
|
|
my_codec_encoding = b"binary/my-codec-encoding" |
|
my_payload_encoding_str = "text/my-payload-encoding" |
|
my_payload_encoding = "text/my-payload-encoding".encode() |
|
|
|
|
|
class MyCodec(PayloadCodec): |
|
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]: |
|
res: List[Payload] = [] |
|
for payload in payloads: |
|
# We only want to encode payloads that are using our expected |
|
# payload encoding |
|
if payload.metadata.get("encoding") == my_payload_encoding: |
|
res.append( |
|
Payload( |
|
metadata={"encoding": my_codec_encoding}, |
|
data=b"my-encoding: " + payload.SerializeToString(), |
|
) |
|
) |
|
else: |
|
res.append(payload) |
|
return res |
|
|
|
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]: |
|
res: List[Payload] = [] |
|
for payload in payloads: |
|
# We only want to decode payloads that are using our expected codec |
|
# encoding |
|
if payload.metadata.get("encoding") == my_codec_encoding: |
|
decoded = Payload() |
|
assert payload.data.startswith(b"my-encoding: ") |
|
decoded.ParseFromString(payload.data.removeprefix(b"my-encoding: ")) |
|
res.append(decoded) |
|
else: |
|
res.append(payload) |
|
return res |
|
|
|
|
|
class MyEncodingPayloadConverter(EncodingPayloadConverter): |
|
@property |
|
def encoding(self) -> str: |
|
return my_payload_encoding_str |
|
|
|
def to_payload(self, value: Any) -> Optional[Payload]: |
|
# We only handle our special object |
|
if not isinstance(value, MySpecialObject): |
|
return None |
|
return Payload( |
|
metadata={"encoding": my_payload_encoding}, |
|
data=value.foo.encode(), |
|
) |
|
|
|
def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any: |
|
# We can assume we have the right encoding, and also assert hint |
|
assert payload.metadata.get("encoding") == my_payload_encoding |
|
assert type_hint == MySpecialObject |
|
return MySpecialObject(payload.data.decode()) |
|
|
|
|
|
class MyPayloadConverter(CompositePayloadConverter): |
|
def __init__(self) -> None: |
|
super().__init__( |
|
MyEncodingPayloadConverter(), |
|
*DefaultPayloadConverter.default_encoding_payload_converters, |
|
) |
|
|
|
|
|
@activity.defn |
|
async def my_activity_special_obj(obj: MySpecialObject) -> None: |
|
print(f"Called activity with {obj}") |
|
|
|
|
|
@activity.defn |
|
async def my_activity_other_obj(obj: str) -> None: |
|
print(f"Called activity with {obj}") |