prefect orion start error: No such command 'orion'

280 Views Asked by At

I am learning workflow orchestration from data engineering zoomcamp and have successfully ingested the data to postgresql with prefect flow but when I try to run command prefect orion start to see the prefect UI, I get an error Error: No such command 'orion'. I have tried everything from reinstalling and setting up the whole project again but the error persists.

prefect version installed.

Version:             2.14.4
API version:         0.8.4
Python version:      3.9.18
Git commit:          d2cf30f4
Built:               Thu, Nov 9, 2023 4:48 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         ephemeral

ingest_data.py file.

import os
import argparse
from time import time
import pandas as pd
from sqlalchemy import create_engine
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def extract_data(csv_url, user, password, host, port, db):
    if csv_url.endswith('.csv.gz'):
        csv_name = 'yellow_tripdata_2021-01.csv.gz'
    else:
        csv_name = 'output.csv'

    os.system(f"wget {csv_url} -O {csv_name}")

    df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)

    df = next(df_iter)

    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    
    return df 

@task(log_prints=True)
def transform_data(df):
    print(f"pre: missing passenger count: {df['passenger_count'].isin([0]).sum()}")
    df = df[df['passenger_count'] != 0]
    print(f"post: missing passenger count: {df['passenger_count'].isin([0]).sum()}")
    
    return df 

@task(log_prints=True, retries=3)
def ingest_data(user, password, host, port, db, table_name, df):
    
    postgres_url = f'postgresql://{user}:{password}@{host}:{port}/{db}'
    engine = create_engine(postgres_url)
    
    df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
    df.to_sql(name=table_name, con=engine, if_exists='append')


@flow(name='Subflow', log_prints=True)
def log_subflow(table_name:str):
    print("Logging subflow  for:{table_name}")

@flow(name="Ingest Flow")
def main(table_name:str):
    user = "root"
    password = "root"
    host = "localhost"
    port = "5432"
    db = "ny_taxi"
    csv_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"
    
    log_subflow(table_name)
    raw_data = extract_data(csv_url, user, password, host, port, db)
    data = transform_data(raw_data)
    ingest_data(user, password, host, port, db, table_name, data)

if __name__ == '__main__':
    main("yellow_taxi_trips")
1

There are 1 best solutions below

0
On

After reading the prefect RELEASE-NOTES I found that references to “Orion” have been replaced with more explicit, conventional nomenclature throughout the codebase. The command group prefect orion is deprecated and renamed to prefect server.