Skip to content
Socaity Docs

Social Media Content Pipeline

Advanced
30 min

Build an automated content factory that takes a topic and produces a complete social media post: a DeepSeek-written script, FLUX-generated hero images, SpeechCraft voiceover, and a face2face avatar video β€” all in a single Python pipeline.

Uses:deepseek_v3 + flux_schnell + speechcraft + face2face.

Pipeline Architecture

The four stages run in two waves. Stage 1 (script) must complete before Stages 2–4 can start, because they all consume the script text. Stages 2, 3, and 4 are then submitted in parallel.

StageServiceInputOutputWave
1 β€” Script
DeepSeek R1Topic stringScript + scene promptsWave 1
2 β€” Images
FLUX SchnellScene promptsHero images (PNG)Wave 2 (parallel)
3 β€” Voice
SpeechCraftScript textVoiceover (MP3)Wave 2 (parallel)
4 β€” Video
face2faceSource face + target videoFace-swapped video (MP4)Wave 2 (parallel)

Step 1 β€” Install and Configure

Install all required packages and set your API key.

terminal
pip install socaity

Step 2 β€” Stage 1: Write the Script with DeepSeek

Use DeepSeek R1 to generate a short narration script from a topic string. The output is structured into a title, three visual scene descriptions, and a 30-second spoken script.

python
import os
import json
from socaity import deepseek_v3

ds = deepseek_v3(api_key=os.getenv("SOCAITY_API_KEY"))

def write_script(topic: str) -> dict:
    """Return a structured script with scene prompts."""
    prompt = (
        "You are a social media video scriptwriter. "
        "Return ONLY valid JSON with keys: 'title', 'scene_prompts' "
        "(list of 3 image prompts), 'script' (≀ 80 words for 30 s voice).

"
        f"Write a short social media video script about: {topic}"
    )
    response = ds.predictions(prompt=prompt).get_result()
    # deepseek_v3 streams tokens β€” join the chunks into the final string.
    script_text = "".join(response) if isinstance(response, list) else response
    return json.loads(script_text)

# Example
content = write_script("The future of AI in healthcare")
print(content["title"])
# β†’ "AI is Rewriting Medicine β€” Here's What You Need to Know"

Step 3 β€” Stages 2, 3, 4: Submit in Parallel

Submit the FLUX, SpeechCraft, and face2face jobs at the same time. All three run on separate GPUs concurrently β€” total time is dominated by the slowest stage, not the sum.

python
from socaity import flux_schnell
from socaity import speechcraft, face2face

flux = flux_schnell(api_key=os.getenv("SOCAITY_API_KEY"))
sc   = speechcraft(api_key=os.getenv("SOCAITY_API_KEY"))
f2f  = face2face(api_key=os.getenv("SOCAITY_API_KEY"))

def launch_parallel_stages(content: dict, avatar_img: str) -> tuple:
    """Submit all three GPU jobs simultaneously."""

    # Stage 2 β€” generate one hero image per scene prompt
    image_jobs = [
        flux(prompt=p, num_outputs=1)
        for p in content["scene_prompts"]
    ]

    # Stage 3 β€” synthesise the voiceover
    audio_job = sc.text2voice(
        text=content["script"],
        voice="en_male_calm",
    )

    # Stage 4 β€” swap presenter face into a pre-recorded template video clip
    video_job = f2f.swap_video(
        faces=avatar_img,
        target_video="./blank_avatar_30s.mp4",
    )

    return image_jobs, audio_job, video_job

Step 4 β€” Collect Results and Save

Block on each job and save the outputs to a named folder for the topic.

python
import pathlib

def collect_and_save(topic: str, image_jobs, audio_job, video_job) -> str:
    slug = topic.lower().replace(" ", "_")[:30]
    out  = pathlib.Path(f"./output/{slug}")
    out.mkdir(parents=True, exist_ok=True)

    # Block and save images
    for i, job in enumerate(image_jobs):
        imgs = job.get_result()
        imgs[0].save(out / f"scene_{i}.png")

    # Block and save audio
    audio = audio_job.get_result()
    audio.save(out / "voiceover.mp3")

    # Block and save video
    video = video_job.get_result()
    video.save(out / "avatar_video.mp4")

    print(f"Pipeline complete β†’ {out}")
    return str(out)

Full Pipeline Script

The complete runnable script, combining all four stages:

python
import os
import json
import pathlib
from socaity import deepseek_v3
from socaity import speechcraft, face2face
from socaity import flux_schnell

SOCAITY_KEY = os.getenv("SOCAITY_API_KEY")
AVATAR_IMG  = "./avatar.jpg"   # your presenter face

# --- Clients ---
# All four models authenticate with SOCAITY_API_KEY.
# Replicate-backed models (deepseek_v3, flux_schnell) are routed through the
# Socaity backend, which handles the upstream Replicate call for you.
ds   = deepseek_v3(api_key=SOCAITY_KEY)
flux = flux_schnell(api_key=SOCAITY_KEY)
sc   = speechcraft(api_key=SOCAITY_KEY)
f2f  = face2face(api_key=SOCAITY_KEY)

def run_pipeline(topic: str) -> str:
    print(f"[1/4] Writing script for: {topic}")
    resp = ds.predictions(prompt=(
        "You are a social media scriptwriter. Return ONLY valid JSON with keys: "
        "'title', 'scene_prompts' (list[str], 3 items), 'script' (str, ≀ 80 words).

"
        f"Topic: {topic}"
    )).get_result()
    # deepseek_v3 streams tokens β€” join the chunks into the final string.
    resp_text = "".join(resp) if isinstance(resp, list) else resp
    content = json.loads(resp_text)
    print(f"    Title: {content['title']}")

    print("[2-4/4] Submitting image / voice / video jobs in parallel...")
    image_jobs = [flux(prompt=p, num_outputs=1) for p in content["scene_prompts"]]
    audio_job  = sc.text2voice(text=content["script"], voice="en_male_calm")
    # face2face swaps the presenter face into an existing template video clip
    video_job  = f2f.swap_video(faces=AVATAR_IMG, target_video="./blank_30s.mp4")

    slug = topic.lower().replace(" ", "_")[:30]
    out  = pathlib.Path(f"./output/{slug}")
    out.mkdir(parents=True, exist_ok=True)

    for i, job in enumerate(image_jobs):
        job.get_result()[0].save(out / f"scene_{i}.png")
    audio_job.get_result().save(out / "voiceover.mp3")
    video_job.get_result().save(out / "avatar_video.mp4")

    print(f"Done! Output in {out}")
    return str(out)


if __name__ == "__main__":
    run_pipeline("The future of AI in healthcare")

Runtime Breakdown

Approximate runtime per pipeline run at default quality settings (1 image, 30 s audio, 15 s video):

StageServiceGPU / UnitEst. Time
1 β€” ScriptDeepSeek R1CPU / token~3 s
2 β€” Images (Γ—3)FLUX SchnellA10G~5 s
3 β€” Voice (30 s)SpeechCraftT4~4 s
4 β€” Video (15 s)face2faceA10G~12 s

Scaling to Production

To run the pipeline at volume, wrap it in a worker and feed topics from a queue:

python
import queue
import threading

topic_queue: queue.Queue = queue.Queue()

def worker():
    while True:
        topic = topic_queue.get()
        try:
            run_pipeline(topic)
        except Exception as e:
            print(f"Pipeline failed for '{topic}': {e}")
        finally:
            topic_queue.task_done()

# Spawn 4 parallel pipeline workers
for _ in range(4):
    threading.Thread(target=worker, daemon=True).start()

# Feed topics (e.g. from a database or Kafka)
topics = [
    "The future of AI in healthcare",
    "How to build a personal brand in 2026",
    "Top 5 productivity hacks for founders",
]
for t in topics:
    topic_queue.put(t)

topic_queue.join()
print("All topics processed.")

What You Built

  • A 4-stage AI content pipeline: script β†’ images β†’ voice β†’ video
  • Parallel execution of Stages 2–4 for minimum wall-clock time
  • A worker pattern that handles high-volume production deployment