from multiformats import CID from ipld.car import CarReader from cbor_x import decode, decode_multiple, add_extension MSG_OP = 1 POST_TYPE = 'app.bsky.feed.post' LIKE_TYPE = 'app.bsky.feed.like' FOLLOW_TYPE = 'app.bsky.graph.follow' REPOST_TYPE = 'app.bsky.feed.repost' POST_LINK = 'https://staging.bsky.app/profile/%s/post/%s' PROFILE_LINK = 'https://staging.bsky.app/profile/%s' def main(): socket = WebSocket('wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos') socket.add_event_listener('message', async (event) => { message_buf = await event.data.array_buffer() header, body = decode_multiple(Uint8Array(message_buf)) if header.op != MSG_OP: return car = await CarReader.from_bytes(body.blocks) for op in body.ops: if not op.cid: continue block = await car.get(op.cid) record = decode(block.bytes) wrapper = document.querySelector('.wrapper') if record.get('$type') == POST_TYPE and isinstance(record.get('text'), str): rkey = op.path.split('/')[-1] append_text(wrapper, record.get('text'), format(POST_LINK, body.get('repo'), rkey)) if record.get('$type') == LIKE_TYPE: rkey = record.get('subject', {}).get('uri', '').split('/')[-1] repo = record.get('subject', {}).get('uri', '').split('/')[-3] if rkey and repo: append_action(wrapper, '💖', format(POST_LINK, repo, rkey)) else: append_action(wrapper, '💖') if record.get('$type') == FOLLOW_TYPE: if isinstance(record.get('subject'), str): append_action(wrapper, '🤝', format(PROFILE_LINK, record.get('subject'))) else: append_action(wrapper, '🤝') if record.get('$type') == REPOST_TYPE: rkey = record.get('subject', {}).get('uri', '').split('/')[-1] repo = record.get('subject', {}).get('uri', '').split('/')[-3] if rkey and repo: append_action(wrapper, '🤙', format(POST_LINK, repo, rkey)) else: append_action(wrapper, '🤙') }) def append_text(wrapper, text, href): post = document.createElement('marquee') link = document.createElement('a') link.href = href link.target = '_blank' link.innerText = text post.appendChild(link) wrapper.insertBefore(post, wrapper.firstChild) def append_action(wrapper, char, href): link = document.createElement('a') link.href = href link.target = '_blank' link.innerText = char if wrapper.firstChild.getAttribute('data-bsky-action'): wrapper.firstChild.appendChild(document.createTextNode(' ')) wrapper.firstChild.appendChild(link) else: action = document.createElement('marquee') action.setAttribute('data-bsky-action', '1') action.appendChild(link) wrapper.insertBefore(action, wrapper.firstChild) def format(string, *params): for param in params: string = string.replace('%s', param) return string add_extension( Class=CID, tag=42, encode=lambda: raise_error('cannot encode cids'), decode=lambda bytes_: CID.decode(bytes_[1:]) if bytes_[0] == 0 else raise_error('invalid cid for cbor tag 42') ) main()