Social Media Content Pipeline
Build an automated content factory that takes a topic and produces a complete social media post: a DeepSeek-written script, FLUX-generated hero images, SpeechCraft voiceover, and a face2face avatar video β all in a single Python pipeline.
Uses:deepseek_v3 + flux_schnell + speechcraft + face2face.
ds.predictions() (not ds.chat()) and the face2face stage has been revised β see the note in Step 3. The four stages run in two waves. Stage 1 (script) must complete before Stages 2β4 can start, because they all consume the script text. Stages 2, 3, and 4 are then submitted in parallel.
| Stage | Service | Input | Output | Wave |
|---|---|---|---|---|
1 β Script | DeepSeek R1 | Topic string | Script + scene prompts | Wave 1 |
2 β Images | FLUX Schnell | Scene prompts | Hero images (PNG) | Wave 2 (parallel) |
3 β Voice | SpeechCraft | Script text | Voiceover (MP3) | Wave 2 (parallel) |
4 β Video | face2face | Source face + target video | Face-swapped video (MP4) | Wave 2 (parallel) |
Install all required packages and set your API key.
pip install socaityUse DeepSeek R1 to generate a short narration script from a topic string. The output is structured into a title, three visual scene descriptions, and a 30-second spoken script.
import os
import json
from socaity import deepseek_v3
ds = deepseek_v3(api_key=os.getenv("SOCAITY_API_KEY"))
def write_script(topic: str) -> dict:
"""Return a structured script with scene prompts."""
prompt = (
"You are a social media video scriptwriter. "
"Return ONLY valid JSON with keys: 'title', 'scene_prompts' "
"(list of 3 image prompts), 'script' (β€ 80 words for 30 s voice).
"
f"Write a short social media video script about: {topic}"
)
response = ds.predictions(prompt=prompt).get_result()
# deepseek_v3 streams tokens β join the chunks into the final string.
script_text = "".join(response) if isinstance(response, list) else response
return json.loads(script_text)
# Example
content = write_script("The future of AI in healthcare")
print(content["title"])
# β "AI is Rewriting Medicine β Here's What You Need to Know"Submit the FLUX, SpeechCraft, and face2face jobs at the same time. All three run on separate GPUs concurrently β total time is dominated by the slowest stage, not the sum.
face2face is a face swapper β it replaces faces in an existing video clip, it does not animate a face from audio. In this pipeline, it transplants your presenter face onto a pre-recorded template video. For true talking-head animation from audio, a dedicated lip-sync model would be required (not currently in this tutorial). from socaity import flux_schnell
from socaity import speechcraft, face2face
flux = flux_schnell(api_key=os.getenv("SOCAITY_API_KEY"))
sc = speechcraft(api_key=os.getenv("SOCAITY_API_KEY"))
f2f = face2face(api_key=os.getenv("SOCAITY_API_KEY"))
def launch_parallel_stages(content: dict, avatar_img: str) -> tuple:
"""Submit all three GPU jobs simultaneously."""
# Stage 2 β generate one hero image per scene prompt
image_jobs = [
flux(prompt=p, num_outputs=1)
for p in content["scene_prompts"]
]
# Stage 3 β synthesise the voiceover
audio_job = sc.text2voice(
text=content["script"],
voice="en_male_calm",
)
# Stage 4 β swap presenter face into a pre-recorded template video clip
video_job = f2f.swap_video(
faces=avatar_img,
target_video="./blank_avatar_30s.mp4",
)
return image_jobs, audio_job, video_jobBlock on each job and save the outputs to a named folder for the topic.
import pathlib
def collect_and_save(topic: str, image_jobs, audio_job, video_job) -> str:
slug = topic.lower().replace(" ", "_")[:30]
out = pathlib.Path(f"./output/{slug}")
out.mkdir(parents=True, exist_ok=True)
# Block and save images
for i, job in enumerate(image_jobs):
imgs = job.get_result()
imgs[0].save(out / f"scene_{i}.png")
# Block and save audio
audio = audio_job.get_result()
audio.save(out / "voiceover.mp3")
# Block and save video
video = video_job.get_result()
video.save(out / "avatar_video.mp4")
print(f"Pipeline complete β {out}")
return str(out)The complete runnable script, combining all four stages:
import os
import json
import pathlib
from socaity import deepseek_v3
from socaity import speechcraft, face2face
from socaity import flux_schnell
SOCAITY_KEY = os.getenv("SOCAITY_API_KEY")
AVATAR_IMG = "./avatar.jpg" # your presenter face
# --- Clients ---
# All four models authenticate with SOCAITY_API_KEY.
# Replicate-backed models (deepseek_v3, flux_schnell) are routed through the
# Socaity backend, which handles the upstream Replicate call for you.
ds = deepseek_v3(api_key=SOCAITY_KEY)
flux = flux_schnell(api_key=SOCAITY_KEY)
sc = speechcraft(api_key=SOCAITY_KEY)
f2f = face2face(api_key=SOCAITY_KEY)
def run_pipeline(topic: str) -> str:
print(f"[1/4] Writing script for: {topic}")
resp = ds.predictions(prompt=(
"You are a social media scriptwriter. Return ONLY valid JSON with keys: "
"'title', 'scene_prompts' (list[str], 3 items), 'script' (str, β€ 80 words).
"
f"Topic: {topic}"
)).get_result()
# deepseek_v3 streams tokens β join the chunks into the final string.
resp_text = "".join(resp) if isinstance(resp, list) else resp
content = json.loads(resp_text)
print(f" Title: {content['title']}")
print("[2-4/4] Submitting image / voice / video jobs in parallel...")
image_jobs = [flux(prompt=p, num_outputs=1) for p in content["scene_prompts"]]
audio_job = sc.text2voice(text=content["script"], voice="en_male_calm")
# face2face swaps the presenter face into an existing template video clip
video_job = f2f.swap_video(faces=AVATAR_IMG, target_video="./blank_30s.mp4")
slug = topic.lower().replace(" ", "_")[:30]
out = pathlib.Path(f"./output/{slug}")
out.mkdir(parents=True, exist_ok=True)
for i, job in enumerate(image_jobs):
job.get_result()[0].save(out / f"scene_{i}.png")
audio_job.get_result().save(out / "voiceover.mp3")
video_job.get_result().save(out / "avatar_video.mp4")
print(f"Done! Output in {out}")
return str(out)
if __name__ == "__main__":
run_pipeline("The future of AI in healthcare")Approximate runtime per pipeline run at default quality settings (1 image, 30 s audio, 15 s video):
| Stage | Service | GPU / Unit | Est. Time |
|---|---|---|---|
| 1 β Script | DeepSeek R1 | CPU / token | ~3 s |
| 2 β Images (Γ3) | FLUX Schnell | A10G | ~5 s |
| 3 β Voice (30 s) | SpeechCraft | T4 | ~4 s |
| 4 β Video (15 s) | face2face | A10G | ~12 s |
To run the pipeline at volume, wrap it in a worker and feed topics from a queue:
import queue
import threading
topic_queue: queue.Queue = queue.Queue()
def worker():
while True:
topic = topic_queue.get()
try:
run_pipeline(topic)
except Exception as e:
print(f"Pipeline failed for '{topic}': {e}")
finally:
topic_queue.task_done()
# Spawn 4 parallel pipeline workers
for _ in range(4):
threading.Thread(target=worker, daemon=True).start()
# Feed topics (e.g. from a database or Kafka)
topics = [
"The future of AI in healthcare",
"How to build a personal brand in 2026",
"Top 5 productivity hacks for founders",
]
for t in topics:
topic_queue.put(t)
topic_queue.join()
print("All topics processed.")- A 4-stage AI content pipeline: script β images β voice β video
- Parallel execution of Stages 2β4 for minimum wall-clock time
- A worker pattern that handles high-volume production deployment