I am learning workflow orchestration from data engineering zoomcamp and have successfully ingested the data to postgresql with prefect flow but when I try to run command prefect orion start
to see the prefect UI, I get an error Error: No such command 'orion'
. I have tried everything from reinstalling and setting up the whole project again but the error persists.
prefect version installed.
Version: 2.14.4
API version: 0.8.4
Python version: 3.9.18
Git commit: d2cf30f4
Built: Thu, Nov 9, 2023 4:48 PM
OS/Arch: win32/AMD64
Profile: default
Server type: ephemeral
ingest_data.py file.
import os
import argparse
from time import time
import pandas as pd
from sqlalchemy import create_engine
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def extract_data(csv_url, user, password, host, port, db):
if csv_url.endswith('.csv.gz'):
csv_name = 'yellow_tripdata_2021-01.csv.gz'
else:
csv_name = 'output.csv'
os.system(f"wget {csv_url} -O {csv_name}")
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
return df
@task(log_prints=True)
def transform_data(df):
print(f"pre: missing passenger count: {df['passenger_count'].isin([0]).sum()}")
df = df[df['passenger_count'] != 0]
print(f"post: missing passenger count: {df['passenger_count'].isin([0]).sum()}")
return df
@task(log_prints=True, retries=3)
def ingest_data(user, password, host, port, db, table_name, df):
postgres_url = f'postgresql://{user}:{password}@{host}:{port}/{db}'
engine = create_engine(postgres_url)
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
df.to_sql(name=table_name, con=engine, if_exists='append')
@flow(name='Subflow', log_prints=True)
def log_subflow(table_name:str):
print("Logging subflow for:{table_name}")
@flow(name="Ingest Flow")
def main(table_name:str):
user = "root"
password = "root"
host = "localhost"
port = "5432"
db = "ny_taxi"
csv_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"
log_subflow(table_name)
raw_data = extract_data(csv_url, user, password, host, port, db)
data = transform_data(raw_data)
ingest_data(user, password, host, port, db, table_name, data)
if __name__ == '__main__':
main("yellow_taxi_trips")
After reading the prefect RELEASE-NOTES I found that references to “Orion” have been replaced with more explicit, conventional nomenclature throughout the codebase. The command group
prefect orion
is deprecated and renamed toprefect server
.